You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/28 02:12:21 UTC

svn commit: r808686 [8/9] - in /hadoop/mapreduce/trunk: ./ ivy/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/ src/test/tools/data/ src/test/tools/data/rumen/ src/test/tools/data/rumen/histogram-tests/...

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonAnySetter;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+/**
+ * A {@link LoggedDiscreteCDF} is a representation of an hadoop job, with the
+ * details of this class set up to meet the requirements of the Jackson JSON
+ * parser/generator.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedJob implements DeepCompare {
+  public enum JobType {
+    JAVA, PIG, STREAMING, PIPES, OVERALL
+  };
+
+  public enum JobPriority {
+    VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH
+  };
+
+  static private TreeSet<String> alreadySeenAnySetterAttributes = new TreeSet<String>();
+
+  String jobID;
+  String user;
+  long computonsPerMapInputByte = -1L;
+  long computonsPerMapOutputByte = -1L;
+  long computonsPerReduceInputByte = -1L;
+  long computonsPerReduceOutputByte = -1L;
+  long submitTime = -1L;
+  long launchTime = -1L;
+  long finishTime = -1L;
+
+  int heapMegabytes = -1;
+  int totalMaps = -1;
+  int totalReduces = -1;
+  JobHistory.Values outcome = JobHistory.Values.SUCCESS;
+  JobType jobtype = JobType.JAVA;
+  JobPriority priority = JobPriority.NORMAL;
+
+  List<String> directDependantJobs = new ArrayList<String>();
+  List<LoggedTask> mapTasks = new ArrayList<LoggedTask>();
+  List<LoggedTask> reduceTasks = new ArrayList<LoggedTask>();
+  List<LoggedTask> otherTasks = new ArrayList<LoggedTask>();
+
+  // There are CDFs for each level of locality -- most local first
+  ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs;
+  // There are CDFs for each level of locality -- most local first
+  ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs;
+
+  LoggedDiscreteCDF successfulReduceAttemptCDF;
+  LoggedDiscreteCDF failedReduceAttemptCDF;
+
+  String queue = null;
+
+  String jobName = null;
+
+  int clusterMapMB = -1;
+  int clusterReduceMB = -1;
+  int jobMapMB = -1;
+  int jobReduceMB = -1;
+
+  long relativeTime = 0;
+
+  double[] mapperTriesToSucceed;
+  double failedMapperFraction; // !!!!!
+
+  LoggedJob() {
+
+  }
+
+  LoggedJob(String jobID) {
+    super();
+
+    setJobID(jobID);
+  }
+
+  // TODO consider having default readers on the other objects
+  @JsonAnySetter
+  public void setUnknownAttribute(String attributeName, Object ignored) {
+    if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
+      alreadySeenAnySetterAttributes.add(attributeName);
+      System.err.println("In LoggedJob, we saw the unknown attribute "
+          + attributeName + ".");
+    }
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getJobID() {
+    return jobID;
+  }
+
+  void setJobID(String jobID) {
+    this.jobID = jobID;
+  }
+
+  public JobPriority getPriority() {
+    return priority;
+  }
+
+  void setPriority(JobPriority priority) {
+    this.priority = priority;
+  }
+
+  public long getComputonsPerMapInputByte() {
+    return computonsPerMapInputByte;
+  }
+
+  void setComputonsPerMapInputByte(long computonsPerMapInputByte) {
+    this.computonsPerMapInputByte = computonsPerMapInputByte;
+  }
+
+  public long getComputonsPerMapOutputByte() {
+    return computonsPerMapOutputByte;
+  }
+
+  void setComputonsPerMapOutputByte(long computonsPerMapOutputByte) {
+    this.computonsPerMapOutputByte = computonsPerMapOutputByte;
+  }
+
+  public long getComputonsPerReduceInputByte() {
+    return computonsPerReduceInputByte;
+  }
+
+  void setComputonsPerReduceInputByte(long computonsPerReduceInputByte) {
+    this.computonsPerReduceInputByte = computonsPerReduceInputByte;
+  }
+
+  public long getComputonsPerReduceOutputByte() {
+    return computonsPerReduceOutputByte;
+  }
+
+  void setComputonsPerReduceOutputByte(long computonsPerReduceOutputByte) {
+    this.computonsPerReduceOutputByte = computonsPerReduceOutputByte; // !!!!!
+  }
+
+  public long getSubmitTime() {
+    return submitTime;
+  }
+
+  void setSubmitTime(long submitTime) {
+    this.submitTime = submitTime;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  void setLaunchTime(long startTime) {
+    this.launchTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public int getHeapMegabytes() {
+    return heapMegabytes;
+  }
+
+  void setHeapMegabytes(int heapMegabytes) {
+    this.heapMegabytes = heapMegabytes;
+  }
+
+  public int getTotalMaps() {
+    return totalMaps;
+  }
+
+  void setTotalMaps(int totalMaps) {
+    this.totalMaps = totalMaps;
+  }
+
+  public int getTotalReduces() {
+    return totalReduces;
+  }
+
+  void setTotalReduces(int totalReduces) {
+    this.totalReduces = totalReduces;
+  }
+
+  public JobHistory.Values getOutcome() {
+    return outcome;
+  }
+
+  void setOutcome(JobHistory.Values outcome) {
+    this.outcome = outcome;
+  }
+
+  public JobType getJobtype() {
+    return jobtype;
+  }
+
+  void setJobtype(JobType jobtype) {
+    this.jobtype = jobtype;
+  }
+
+  public List<String> getDirectDependantJobs() {
+    return directDependantJobs;
+  }
+
+  void setDirectDependantJobs(List<String> directDependantJobs) {
+    this.directDependantJobs = directDependantJobs;
+  }
+
+  public List<LoggedTask> getMapTasks() {
+    return mapTasks;
+  }
+
+  void setMapTasks(List<LoggedTask> mapTasks) {
+    this.mapTasks = mapTasks;
+  }
+
+  public List<LoggedTask> getReduceTasks() {
+    return reduceTasks;
+  }
+
+  void setReduceTasks(List<LoggedTask> reduceTasks) {
+    this.reduceTasks = reduceTasks;
+  }
+
+  public List<LoggedTask> getOtherTasks() {
+    return otherTasks;
+  }
+
+  void setOtherTasks(List<LoggedTask> otherTasks) {
+    this.otherTasks = otherTasks;
+  }
+
+  public ArrayList<LoggedDiscreteCDF> getSuccessfulMapAttemptCDFs() {
+    return successfulMapAttemptCDFs;
+  }
+
+  void setSuccessfulMapAttemptCDFs(
+      ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs) {
+    this.successfulMapAttemptCDFs = successfulMapAttemptCDFs;
+  }
+
+  public ArrayList<LoggedDiscreteCDF> getFailedMapAttemptCDFs() {
+    return failedMapAttemptCDFs;
+  }
+
+  void setFailedMapAttemptCDFs(ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs) {
+    this.failedMapAttemptCDFs = failedMapAttemptCDFs;
+  }
+
+  public LoggedDiscreteCDF getSuccessfulReduceAttemptCDF() {
+    return successfulReduceAttemptCDF;
+  }
+
+  void setSuccessfulReduceAttemptCDF(
+      LoggedDiscreteCDF successfulReduceAttemptCDF) {
+    this.successfulReduceAttemptCDF = successfulReduceAttemptCDF;
+  }
+
+  public LoggedDiscreteCDF getFailedReduceAttemptCDF() {
+    return failedReduceAttemptCDF;
+  }
+
+  void setFailedReduceAttemptCDF(LoggedDiscreteCDF failedReduceAttemptCDF) {
+    this.failedReduceAttemptCDF = failedReduceAttemptCDF;
+  }
+
+  public double[] getMapperTriesToSucceed() {
+    return mapperTriesToSucceed;
+  }
+
+  void setMapperTriesToSucceed(double[] mapperTriesToSucceed) {
+    this.mapperTriesToSucceed = mapperTriesToSucceed;
+  }
+
+  public double getFailedMapperFraction() {
+    return failedMapperFraction;
+  }
+
+  void setFailedMapperFraction(double failedMapperFraction) {
+    this.failedMapperFraction = failedMapperFraction;
+  }
+
+  public long getRelativeTime() {
+    return relativeTime;
+  }
+
+  void setRelativeTime(long relativeTime) {
+    this.relativeTime = relativeTime;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public int getClusterMapMB() {
+    return clusterMapMB;
+  }
+
+  void setClusterMapMB(int clusterMapMB) {
+    this.clusterMapMB = clusterMapMB;
+  }
+
+  public int getClusterReduceMB() {
+    return clusterReduceMB;
+  }
+
+  void setClusterReduceMB(int clusterReduceMB) {
+    this.clusterReduceMB = clusterReduceMB;
+  }
+
+  public int getJobMapMB() {
+    return jobMapMB;
+  }
+
+  void setJobMapMB(int jobMapMB) {
+    this.jobMapMB = jobMapMB;
+  }
+
+  public int getJobReduceMB() {
+    return jobReduceMB;
+  }
+
+  void setJobReduceMB(int jobReduceMB) {
+    this.jobReduceMB = jobReduceMB;
+  }
+
+  private void compare1(String c1, String c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(long c1, long c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+      TreePath loc, String eltname) throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(JobType c1, JobType c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(JobPriority c1, JobPriority c2, TreePath loc,
+      String eltname) throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(int c1, int c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(double c1, double c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(double[] c1, double[] c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    TreePath recursePath = new TreePath(loc, eltname);
+
+    if (c1 == null || c2 == null || c1.length != c2.length) {
+      throw new DeepInequalityException(eltname + " miscompared", recursePath);
+    }
+
+    for (int i = 0; i < c1.length; ++i) {
+      if (c1[i] != c2[i]) {
+        throw new DeepInequalityException(eltname + " miscompared",
+            new TreePath(loc, eltname, i));
+      }
+    }
+  }
+
+  private void compare1(DeepCompare c1, DeepCompare c2, TreePath loc,
+      String eltname, int index) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    TreePath recursePath = new TreePath(loc, eltname, index);
+
+    if (c1 == null || c2 == null) {
+      if (index == -1) {
+        throw new DeepInequalityException(eltname + " miscompared", recursePath);
+      } else {
+        throw new DeepInequalityException(eltname + "[" + index
+            + "] miscompared", recursePath);
+      }
+    }
+
+    c1.deepCompare(c2, recursePath);
+  }
+
+  // I'll treat this as an atomic object type
+  private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
+      String eltname) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    TreePath recursePath = new TreePath(loc, eltname);
+
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", recursePath);
+    }
+  }
+
+  private void compareLoggedTasks(List<LoggedTask> c1, List<LoggedTask> c2,
+      TreePath loc, String eltname) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+    }
+  }
+
+  private void compareCDFs(List<LoggedDiscreteCDF> c1,
+      List<LoggedDiscreteCDF> c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+    }
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedJob)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedJob other = (LoggedJob) comparand;
+
+    compare1(jobID, other.jobID, loc, "jobID");
+    compare1(user, other.user, loc, "user");
+
+    compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
+        "computonsPerMapInputByte");
+    compare1(computonsPerMapOutputByte, other.computonsPerMapOutputByte, loc,
+        "computonsPerMapOutputByte");
+    compare1(computonsPerReduceInputByte, other.computonsPerReduceInputByte,
+        loc, "computonsPerReduceInputByte");
+    compare1(computonsPerReduceOutputByte, other.computonsPerReduceOutputByte,
+        loc, "computonsPerReduceOutputByte");
+
+    compare1(submitTime, other.submitTime, loc, "submitTime");
+    compare1(launchTime, other.launchTime, loc, "launchTime");
+    compare1(finishTime, other.finishTime, loc, "finishTime");
+
+    compare1(heapMegabytes, other.heapMegabytes, loc, "heapMegabytes");
+
+    compare1(totalMaps, other.totalMaps, loc, "totalMaps");
+    compare1(totalReduces, other.totalReduces, loc, "totalReduces");
+
+    compare1(outcome, other.outcome, loc, "outcome");
+    compare1(jobtype, other.jobtype, loc, "jobtype");
+    compare1(priority, other.priority, loc, "priority");
+
+    compareStrings(directDependantJobs, other.directDependantJobs, loc,
+        "directDependantJobs");
+
+    compareLoggedTasks(mapTasks, other.mapTasks, loc, "mapTasks");
+    compareLoggedTasks(reduceTasks, other.reduceTasks, loc, "reduceTasks");
+    compareLoggedTasks(otherTasks, other.otherTasks, loc, "otherTasks");
+
+    compare1(relativeTime, other.relativeTime, loc, "relativeTime");
+
+    compareCDFs(successfulMapAttemptCDFs, other.successfulMapAttemptCDFs, loc,
+        "successfulMapAttemptCDFs");
+    compareCDFs(failedMapAttemptCDFs, other.failedMapAttemptCDFs, loc,
+        "failedMapAttemptCDFs");
+    compare1(successfulReduceAttemptCDF, other.successfulReduceAttemptCDF, loc,
+        "successfulReduceAttemptCDF", -1);
+    compare1(failedReduceAttemptCDF, other.failedReduceAttemptCDF, loc,
+        "failedReduceAttemptCDF", -1);
+
+    compare1(mapperTriesToSucceed, other.mapperTriesToSucceed, loc,
+        "mapperTriesToSucceed");
+    compare1(failedMapperFraction, other.failedMapperFraction, loc,
+        "failedMapperFraction");
+
+    compare1(queue, other.queue, loc, "queue");
+    compare1(jobName, other.jobName, loc, "jobName");
+
+    compare1(clusterMapMB, other.clusterMapMB, loc, "clusterMapMB");
+    compare1(clusterReduceMB, other.clusterReduceMB, loc, "clusterReduceMB");
+    compare1(jobMapMB, other.jobMapMB, loc, "jobMapMB");
+    compare1(jobReduceMB, other.jobReduceMB, loc, "jobReduceMB");
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link LoggedLocation} is a representation of a point in an hierarchical
+ * network, represented as a series of membership names, broadest first.
+ * 
+ * For example, if your network has <i>hosts</i> grouped into <i>racks</i>, then
+ * in onecluster you might have a node {@code node1} on rack {@code rack1}. This
+ * would be represented with a ArrayList of two layers, with two {@link String}
+ * s being {@code "rack1"} and {@code "node1"}.
+ * 
+ * The details of this class are set up to meet the requirements of the Jackson
+ * JSON parser/generator.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedLocation implements DeepCompare {
+  // The full path from the root of the network to the host.
+  //
+  // NOTE that this assumes that the network topology is a tree.
+  List<String> layers = new ArrayList<String>();
+
+  public List<String> getLayers() {
+    return layers;
+  }
+
+  void setLayers(List<String> layers) {
+    this.layers = layers;
+  }
+
+  // I'll treat this as an atomic object type
+  private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
+      String eltname) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    TreePath recursePath = new TreePath(loc, eltname);
+
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", recursePath);
+    }
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedLocation)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedLocation other = (LoggedLocation) comparand;
+
+    compareStrings(layers, other.layers, loc, "layers");
+
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+/**
+ * A {@link LoggedNetworkTopology} represents a tree that in turn represents a
+ * hierarchy of hosts. The current version requires the tree to have all leaves
+ * at the same level.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedNetworkTopology implements DeepCompare {
+  String name;
+  List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
+
+  public LoggedNetworkTopology() {
+    super();
+  }
+
+  /**
+   * We need this because we have to sort the {@code children} field. That field
+   * is set-valued, but if we sort these fields we ensure that comparisons won't
+   * bogusly fail because the hash table happened to enumerate in a different
+   * order.
+   * 
+   */
+  static private class TopoSort implements Comparator<LoggedNetworkTopology> {
+    public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
+      return t1.name.compareTo(t2.name);
+    }
+  }
+
+  /**
+   * @param hosts
+   *          a HashSet of the {@link ParsedHost}
+   * @param name
+   *          the name of this level's host [for recursive descent]
+   * @param level
+   *          the level number
+   */
+  LoggedNetworkTopology(HashSet<ParsedHost> hosts, String name, int level) {
+
+    this.name = name;
+    this.children = null;
+
+    if (level < ParsedHost.numberOfDistances() - 1) {
+      HashMap<String, HashSet<ParsedHost>> topologies = new HashMap<String, HashSet<ParsedHost>>();
+
+      Iterator<ParsedHost> iter = hosts.iterator();
+
+      while (iter.hasNext()) {
+        ParsedHost host = iter.next();
+
+        String thisComponent = host.nameComponent(level);
+
+        HashSet<ParsedHost> thisSet = topologies.get(thisComponent);
+
+        if (thisSet == null) {
+          thisSet = new HashSet<ParsedHost>();
+          topologies.put(thisComponent, thisSet);
+        }
+
+        thisSet.add(host);
+      }
+
+      children = new ArrayList<LoggedNetworkTopology>();
+
+      for (Map.Entry<String, HashSet<ParsedHost>> ent : topologies.entrySet()) {
+        children.add(new LoggedNetworkTopology(ent.getValue(), ent.getKey(),
+            level + 1));
+      }
+    } else {
+      // nothing to do here
+    }
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  void setName(String name) {
+    this.name = name;
+  }
+
+  public List<LoggedNetworkTopology> getChildren() {
+    return children;
+  }
+
+  void setChildren(List<LoggedNetworkTopology> children) {
+    this.children = children;
+  }
+
+  private void compare1(List<LoggedNetworkTopology> c1,
+      List<LoggedNetworkTopology> c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+
+    Collections.sort(c1, new TopoSort());
+    Collections.sort(c2, new TopoSort());
+
+    for (int i = 0; i < c1.size(); ++i) {
+      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+    }
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedNetworkTopology)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedNetworkTopology other = (LoggedNetworkTopology) comparand;
+
+    compare1(children, other.children, loc, "children");
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * A {@link LoggedSingleRelativeRanking} represents an X-Y coordinate of a
+ * single point in a discrete CDF.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedSingleRelativeRanking implements DeepCompare {
+  /**
+   * The Y coordinate, as a fraction {@code ( 0.0D, 1.0D )}. The default value
+   * is there to mark an unfilled-in value.
+   */
+  double relativeRanking = -1.0D;
+  /**
+   * The X coordinate
+   */
+  long datum = -1L;
+
+  public double getRelativeRanking() {
+    return relativeRanking;
+  }
+
+  void setRelativeRanking(double relativeRanking) {
+    this.relativeRanking = relativeRanking;
+  }
+
+  public long getDatum() {
+    return datum;
+  }
+
+  void setDatum(long datum) {
+    this.datum = datum;
+  }
+
+  private void compare1(long c1, long c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(double c1, double c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedSingleRelativeRanking)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedSingleRelativeRanking other = (LoggedSingleRelativeRanking) comparand;
+
+    compare1(relativeRanking, other.relativeRanking, loc, "relativeRanking");
+    compare1(datum, other.datum, loc, "datum");
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+/**
+ * A {@link LoggedTask} represents a [hadoop] task that is part of a hadoop job.
+ * It knows about the [pssibly empty] sequence of attempts, its I/O footprint,
+ * and its runtime.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedTask implements DeepCompare {
+  long inputBytes = -1L;
+  long inputRecords = -1L;
+  long outputBytes = -1L;
+  long outputRecords = -1L;
+  String taskID;
+  long startTime = -1L;
+  long finishTime = -1L;
+  JobHistory.Values taskType;
+  JobHistory.Values taskStatus;
+  List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
+
+  ArrayList<LoggedLocation> preferredLocations = new ArrayList<LoggedLocation>();
+
+  int numberMaps = -1;
+  int numberReduces = -1;
+
+  LoggedTask() {
+    super();
+  }
+
+  public long getInputBytes() {
+    return inputBytes;
+  }
+
+  void setInputBytes(long inputBytes) {
+    this.inputBytes = inputBytes;
+  }
+
+  public long getInputRecords() {
+    return inputRecords;
+  }
+
+  void setInputRecords(long inputRecords) {
+    this.inputRecords = inputRecords;
+  }
+
+  public long getOutputBytes() {
+    return outputBytes;
+  }
+
+  void setOutputBytes(long outputBytes) {
+    this.outputBytes = outputBytes;
+  }
+
+  public long getOutputRecords() {
+    return outputRecords;
+  }
+
+  void setOutputRecords(long outputRecords) {
+    this.outputRecords = outputRecords;
+  }
+
+  public String getTaskID() {
+    return taskID;
+  }
+
+  void setTaskID(String taskID) {
+    this.taskID = taskID;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public List<LoggedTaskAttempt> getAttempts() {
+    return attempts;
+  }
+
+  void setAttempts(List<LoggedTaskAttempt> attempts) {
+    this.attempts = attempts;
+  }
+
+  public ArrayList<LoggedLocation> getPreferredLocations() {
+    return preferredLocations;
+  }
+
+  void setPreferredLocations(ArrayList<LoggedLocation> preferredLocations) {
+    this.preferredLocations = preferredLocations;
+  }
+
+  public int getNumberMaps() {
+    return numberMaps;
+  }
+
+  void setNumberMaps(int numberMaps) {
+    this.numberMaps = numberMaps;
+  }
+
+  public int getNumberReduces() {
+    return numberReduces;
+  }
+
+  void setNumberReduces(int numberReduces) {
+    this.numberReduces = numberReduces;
+  }
+
+  public JobHistory.Values getTaskStatus() {
+    return taskStatus;
+  }
+
+  void setTaskStatus(JobHistory.Values taskStatus) {
+    this.taskStatus = taskStatus;
+  }
+
+  public JobHistory.Values getTaskType() {
+    return taskType;
+  }
+
+  void setTaskType(JobHistory.Values taskType) {
+    this.taskType = taskType;
+  }
+
+  private void compare1(long c1, long c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(String c1, String c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+      TreePath loc, String eltname) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compareLoggedLocations(ArrayList<LoggedLocation> c1,
+      ArrayList<LoggedLocation> c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+    }
+  }
+
+  private void compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1,
+      List<LoggedTaskAttempt> c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+    }
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedTask)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedTask other = (LoggedTask) comparand;
+
+    compare1(inputBytes, other.inputBytes, loc, "inputBytes");
+    compare1(inputRecords, other.inputRecords, loc, "inputRecords");
+    compare1(outputBytes, other.outputBytes, loc, "outputBytes");
+    compare1(outputRecords, other.outputRecords, loc, "outputRecords");
+
+    compare1(taskID, other.taskID, loc, "taskID");
+
+    compare1(startTime, other.startTime, loc, "startTime");
+    compare1(finishTime, other.finishTime, loc, "finishTime");
+
+    compare1(taskType, other.taskType, loc, "taskType");
+    compare1(taskStatus, other.taskStatus, loc, "taskStatus");
+
+    compareLoggedTaskAttempts(attempts, other.attempts, loc, "attempts");
+    compareLoggedLocations(preferredLocations, other.preferredLocations, loc,
+        "preferredLocations");
+  }
+  /*
+   * ArrayList<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
+   * 
+   * ArrayList<LoggedLocation> preferredLocations;
+   * 
+   * int numberMaps = -1; int numberReduces = -1;
+   */
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+// HACK ALERT!!!  This "should" have have two subclasses, which might be called
+//                LoggedMapTaskAttempt and LoggedReduceTaskAttempt, but 
+//                the Jackson implementation of JSON doesn't handle a 
+//                superclass-valued field.
+
+/**
+ * A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
+ * hadoop job. Note that a task can have several attempts.
+ * 
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ * 
+ */
+public class LoggedTaskAttempt implements DeepCompare {
+
+  String attemptID;
+  JobHistory.Values result;
+  long startTime = -1L;
+  long finishTime = -1L;
+  String hostName;
+
+  long hdfsBytesRead = -1L;
+  long hdfsBytesWritten = -1L;
+  long fileBytesRead = -1L;
+  long fileBytesWritten = -1L;
+  long mapInputRecords = -1L;
+  long mapInputBytes = -1L;
+  long mapOutputBytes = -1L;
+  long mapOutputRecords = -1L;
+  long combineInputRecords = -1L;
+  long reduceInputGroups = -1L;
+  long reduceInputRecords = -1L;
+  long reduceShuffleBytes = -1L;
+  long reduceOutputRecords = -1L;
+  long spilledRecords = -1L;
+
+  long shuffleFinished = -1L;
+  long sortFinished = -1L;
+
+  LoggedLocation location;
+
+  LoggedTaskAttempt() {
+    super();
+  }
+
+  public long getShuffleFinished() {
+    return shuffleFinished;
+  }
+
+  void setShuffleFinished(long shuffleFinished) {
+    this.shuffleFinished = shuffleFinished;
+  }
+
+  public long getSortFinished() {
+    return sortFinished;
+  }
+
+  void setSortFinished(long sortFinished) {
+    this.sortFinished = sortFinished;
+  }
+
+  public String getAttemptID() {
+    return attemptID;
+  }
+
+  void setAttemptID(String attemptID) {
+    this.attemptID = attemptID;
+  }
+
+  public JobHistory.Values getResult() {
+    return result;
+  }
+
+  void setResult(JobHistory.Values result) {
+    this.result = result;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  public long getHdfsBytesRead() {
+    return hdfsBytesRead;
+  }
+
+  void setHdfsBytesRead(long hdfsBytesRead) {
+    this.hdfsBytesRead = hdfsBytesRead;
+  }
+
+  public long getHdfsBytesWritten() {
+    return hdfsBytesWritten;
+  }
+
+  void setHdfsBytesWritten(long hdfsBytesWritten) {
+    this.hdfsBytesWritten = hdfsBytesWritten;
+  }
+
+  public long getFileBytesRead() {
+    return fileBytesRead;
+  }
+
+  void setFileBytesRead(long fileBytesRead) {
+    this.fileBytesRead = fileBytesRead;
+  }
+
+  public long getFileBytesWritten() {
+    return fileBytesWritten;
+  }
+
+  void setFileBytesWritten(long fileBytesWritten) {
+    this.fileBytesWritten = fileBytesWritten;
+  }
+
+  public long getMapInputRecords() {
+    return mapInputRecords;
+  }
+
+  void setMapInputRecords(long mapInputRecords) {
+    this.mapInputRecords = mapInputRecords;
+  }
+
+  public long getMapOutputBytes() {
+    return mapOutputBytes;
+  }
+
+  void setMapOutputBytes(long mapOutputBytes) {
+    this.mapOutputBytes = mapOutputBytes;
+  }
+
+  public long getMapOutputRecords() {
+    return mapOutputRecords;
+  }
+
+  void setMapOutputRecords(long mapOutputRecords) {
+    this.mapOutputRecords = mapOutputRecords;
+  }
+
+  public long getCombineInputRecords() {
+    return combineInputRecords;
+  }
+
+  void setCombineInputRecords(long combineInputRecords) {
+    this.combineInputRecords = combineInputRecords;
+  }
+
+  public long getReduceInputGroups() {
+    return reduceInputGroups;
+  }
+
+  void setReduceInputGroups(long reduceInputGroups) {
+    this.reduceInputGroups = reduceInputGroups;
+  }
+
+  public long getReduceInputRecords() {
+    return reduceInputRecords;
+  }
+
+  void setReduceInputRecords(long reduceInputRecords) {
+    this.reduceInputRecords = reduceInputRecords;
+  }
+
+  public long getReduceShuffleBytes() {
+    return reduceShuffleBytes;
+  }
+
+  void setReduceShuffleBytes(long reduceShuffleBytes) {
+    this.reduceShuffleBytes = reduceShuffleBytes;
+  }
+
+  public long getReduceOutputRecords() {
+    return reduceOutputRecords;
+  }
+
+  void setReduceOutputRecords(long reduceOutputRecords) {
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+
+  public long getSpilledRecords() {
+    return spilledRecords;
+  }
+
+  void setSpilledRecords(long spilledRecords) {
+    this.spilledRecords = spilledRecords;
+  }
+
+  public LoggedLocation getLocation() {
+    return location;
+  }
+
+  void setLocation(LoggedLocation location) {
+    this.location = location;
+  }
+
+  public long getMapInputBytes() {
+    return mapInputBytes;
+  }
+
+  void setMapInputBytes(long mapInputBytes) {
+    this.mapInputBytes = mapInputBytes;
+  }
+
+  private void compare1(String c1, String c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || !c1.equals(c2)) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(long c1, long c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+      TreePath loc, String eltname) throws DeepInequalityException {
+    if (c1 != c2) {
+      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+          loc, eltname));
+    }
+  }
+
+  private void compare1(LoggedLocation c1, LoggedLocation c2, TreePath loc,
+      String eltname) throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    TreePath recurse = new TreePath(loc, eltname);
+
+    if (c1 == null || c2 == null) {
+      throw new DeepInequalityException(eltname + " miscompared", recurse);
+    }
+
+    c1.deepCompare(c2, recurse);
+  }
+
+  public void deepCompare(DeepCompare comparand, TreePath loc)
+      throws DeepInequalityException {
+    if (!(comparand instanceof LoggedTaskAttempt)) {
+      throw new DeepInequalityException("comparand has wrong type", loc);
+    }
+
+    LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
+
+    compare1(attemptID, other.attemptID, loc, "attemptID");
+    compare1(result, other.result, loc, "result");
+    compare1(startTime, other.startTime, loc, "startTime");
+    compare1(finishTime, other.finishTime, loc, "finishTime");
+    compare1(hostName, other.hostName, loc, "hostName");
+
+    compare1(hdfsBytesRead, other.hdfsBytesRead, loc, "hdfsBytesRead");
+    compare1(hdfsBytesWritten, other.hdfsBytesWritten, loc, "hdfsBytesWritten");
+    compare1(fileBytesRead, other.fileBytesRead, loc, "fileBytesRead");
+    compare1(fileBytesWritten, other.fileBytesWritten, loc, "fileBytesWritten");
+    compare1(mapInputBytes, other.mapInputBytes, loc, "mapInputBytes");
+    compare1(mapInputRecords, other.mapInputRecords, loc, "mapInputRecords");
+    compare1(mapOutputBytes, other.mapOutputBytes, loc, "mapOutputBytes");
+    compare1(mapOutputRecords, other.mapOutputRecords, loc, "mapOutputRecords");
+    compare1(combineInputRecords, other.combineInputRecords, loc,
+        "combineInputRecords");
+    compare1(reduceInputGroups, other.reduceInputGroups, loc,
+        "reduceInputGroups");
+    compare1(reduceInputRecords, other.reduceInputRecords, loc,
+        "reduceInputRecords");
+    compare1(reduceShuffleBytes, other.reduceShuffleBytes, loc,
+        "reduceShuffleBytes");
+    compare1(reduceOutputRecords, other.reduceOutputRecords, loc,
+        "reduceOutputRecords");
+    compare1(spilledRecords, other.spilledRecords, loc, "spilledRecords");
+
+    compare1(shuffleFinished, other.shuffleFinished, loc, "shuffleFinished");
+    compare1(sortFinished, other.sortFinished, loc, "sortFinished");
+
+    compare1(location, other.location, loc, "location");
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+public class MapTaskAttemptInfo extends TaskAttemptInfo {
+
+  private long runtime;
+
+  public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {
+    super(state, taskInfo);
+    this.runtime = runtime;
+  }
+
+  @Override
+  public long getRuntime() {
+    return getMapRuntime();
+  }
+
+  /**
+   * Get the runtime for the <b>map</b> phase of the map-task attempt.
+   * 
+   * @return the runtime for the <b>map</b> phase of the map-task attempt
+   */
+  public long getMapRuntime() {
+    return runtime;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+class Pair<CarType, CdrType> {
+  private final CarType car;
+  private final CdrType cdr;
+
+  Pair(CarType car, CdrType cdr) {
+    super();
+
+    this.car = car;
+    this.cdr = cdr;
+  }
+
+  CarType first() {
+    return car;
+  }
+
+  CdrType second() {
+    return cdr;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.io.StringReader;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Element;
+import org.w3c.dom.Text;
+
+import org.xml.sax.SAXException;
+
+class ParsedConfigFile {
+  static Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+
+  static Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+
+  final int heapMegabytes;
+
+  final String queue;
+  final String jobName;
+
+  final int clusterMapMB;
+  final int clusterReduceMB;
+  final int jobMapMB;
+  final int jobReduceMB;
+
+  final String jobID;
+
+  final boolean valid;
+
+  private int maybeGetIntValue(String propName, String attr, String value,
+      int oldValue) {
+    if (propName.equals(attr) && value != null) {
+      try {
+        return Integer.parseInt(value);
+      } catch (NumberFormatException e) {
+        return oldValue;
+      }
+    }
+
+    return oldValue;
+  }
+
+  ParsedConfigFile(String filenameLine, String xmlString) {
+    super();
+
+    int heapMegabytes = -1;
+
+    String queue = null;
+    String jobName = null;
+
+    int clusterMapMB = -1;
+    int clusterReduceMB = -1;
+    int jobMapMB = -1;
+    int jobReduceMB = -1;
+
+    String jobID = null;
+
+    boolean valid = true;
+
+    Matcher jobIDMatcher = jobIDPattern.matcher(filenameLine);
+
+    if (jobIDMatcher.find()) {
+      jobID = jobIDMatcher.group(1);
+    }
+
+    try {
+      InputStream is = new ByteArrayInputStream(xmlString.getBytes());
+
+      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+      DocumentBuilder db = dbf.newDocumentBuilder();
+
+      Document doc = db.parse(is);
+
+      Element root = doc.getDocumentElement();
+
+      if (!"configuration".equals(root.getTagName())) {
+        System.out.print("root is not a configuration node");
+        valid = false;
+      }
+
+      NodeList props = root.getChildNodes();
+
+      for (int i = 0; i < props.getLength(); ++i) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element))
+          continue;
+        Element prop = (Element) propNode;
+        if (!"property".equals(prop.getTagName())) {
+          System.out.print("bad conf file: element not <property>");
+        }
+        NodeList fields = prop.getChildNodes();
+        String attr = null;
+        String value = null;
+        boolean finalParameter = false;
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element)) {
+            continue;
+          }
+
+          Element field = (Element) fieldNode;
+          if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+            attr = ((Text) field.getFirstChild()).getData().trim();
+          }
+          if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+            value = ((Text) field.getFirstChild()).getData();
+          }
+          if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+            finalParameter = "true".equals(((Text) field.getFirstChild())
+                .getData());
+          }
+        }
+        if ("mapred.child.java.opts".equals(attr) && value != null) {
+          Matcher matcher = heapPattern.matcher(value);
+          if (matcher.find()) {
+            String heapSize = matcher.group(1);
+
+            heapMegabytes = Integer.parseInt(heapSize);
+
+            if (matcher.group(2).equalsIgnoreCase("G")) {
+              heapMegabytes *= 1024;
+            }
+          }
+        }
+
+        if ("mapred.job.queue.name".equals(attr) && value != null) {
+          queue = value;
+        }
+
+        if ("mapred.job.name".equals(attr) && value != null) {
+          jobName = value;
+        }
+
+        clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
+            value, clusterMapMB);
+        clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
+            attr, value, clusterReduceMB);
+        jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
+            jobMapMB);
+        jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
+            value, jobReduceMB);
+      }
+
+      valid = true;
+    } catch (ParserConfigurationException e) {
+      valid = false;
+    } catch (SAXException e) {
+      valid = false;
+    } catch (IOException e) {
+      valid = false;
+    }
+
+    this.heapMegabytes = heapMegabytes;
+
+    this.queue = queue;
+    this.jobName = jobName;
+
+    this.clusterMapMB = clusterMapMB;
+    this.clusterReduceMB = clusterReduceMB;
+    this.jobMapMB = jobMapMB;
+    this.jobReduceMB = jobReduceMB;
+
+    this.jobID = jobID;
+
+    this.valid = valid;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+class ParsedHost {
+  String rackName;
+  String nodeName;
+
+  private static Pattern splitPattern = Pattern
+      .compile("/([0-9\\\\\\.]+)/(.+)");
+
+  static int numberOfDistances() {
+    return 3;
+  }
+
+  /**
+   * @return the components, broadest first [ie., the last element is always the
+   *         individual node name]
+   */
+  String[] nameComponents() {
+    String[] result = new String[2];
+
+    result[0] = rackName;
+    result[1] = nodeName;
+
+    return result;
+  }
+
+  String nameComponent(int i) throws IllegalArgumentException {
+    switch (i) {
+    case 0:
+      return rackName;
+
+    case 1:
+      return nodeName;
+
+    default:
+      throw new IllegalArgumentException(
+          "Host location component index out of range.");
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return rackName.hashCode() * 17 + nodeName.hashCode();
+  }
+
+  ParsedHost(String name) throws IllegalArgumentException {
+    // separate out the node name
+    Matcher matcher = splitPattern.matcher(name);
+
+    if (!matcher.matches()) {
+      throw new IllegalArgumentException("Illegal node designator: \"" + name
+          + "\"");
+    }
+
+    rackName = matcher.group(1);
+    nodeName = matcher.group(2);
+  }
+
+  public ParsedHost(LoggedLocation loc) {
+    List<String> coordinates = loc.getLayers();
+
+    rackName = coordinates.get(0);
+    nodeName = coordinates.get(1);
+  }
+
+  LoggedLocation makeLoggedLocation() {
+    LoggedLocation result = new LoggedLocation();
+
+    List<String> coordinates = new ArrayList<String>();
+
+    coordinates.add(rackName);
+    coordinates.add(nodeName);
+
+    result.setLayers(coordinates);
+
+    return result;
+  }
+
+  // expects the broadest name first
+  ParsedHost(String[] names) {
+    rackName = names[0];
+    nodeName = names[1];
+  }
+
+  // returns the broadest name first
+  String[] nameList() {
+    String[] result = new String[2];
+
+    result[0] = rackName;
+    result[1] = nodeName;
+
+    return result;
+  }
+
+  public boolean equals(Object other) {
+    if (other instanceof ParsedHost) {
+      return distance((ParsedHost) other) == 0;
+    }
+
+    return false;
+  }
+
+  int distance(ParsedHost other) {
+    if (nodeName.equals(other.nodeName)) {
+      return 0;
+    }
+
+    if (rackName.equals(other.rackName)) {
+      return 1;
+    }
+
+    return 2;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/**
+ * 
+ */
+class ParsedLine {
+
+  Properties content;
+
+  LogRecordType type;
+
+  static Pattern keyValPair = Pattern
+      .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+
+  /**
+	 * 
+	 */
+  ParsedLine() {
+  }
+
+  ParsedLine(String fullLine, int version) {
+    super();
+
+    content = new Properties();
+
+    int firstSpace = fullLine.indexOf(" ");
+
+    if (firstSpace < 0) {
+      firstSpace = fullLine.length();
+    }
+
+    if (firstSpace == 0) {
+      return; // This is a junk line of some sort
+    }
+
+    type = LogRecordType.intern(fullLine.substring(0, firstSpace));
+
+    String propValPairs = fullLine.substring(firstSpace + 1);
+
+    while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
+      propValPairs = propValPairs.substring(1);
+    }
+
+    int cursor = 0;
+
+    while (cursor < propValPairs.length()) {
+      int equals = propValPairs.indexOf('=', cursor);
+
+      if (equals < 0) {
+        // maybe we do some error processing
+        return;
+      }
+
+      int nextCursor;
+
+      int endValue;
+
+      if (propValPairs.charAt(equals + 1) == '\"') {
+        int closeQuote = propValPairs.indexOf('\"', equals + 2);
+
+        nextCursor = closeQuote + 1;
+
+        endValue = closeQuote;
+
+        if (closeQuote < 0) {
+          endValue = propValPairs.length();
+
+          nextCursor = endValue;
+        }
+      } else {
+        int closeSpace = propValPairs.indexOf(' ', equals + 1);
+
+        if (closeSpace < 0) {
+          closeSpace = propValPairs.length();
+        }
+
+        endValue = closeSpace;
+
+        nextCursor = endValue;
+      }
+
+      content.setProperty(propValPairs.substring(cursor, equals), propValPairs
+          .substring(equals + 2, endValue));
+
+      cursor = nextCursor;
+
+      while (cursor < propValPairs.length()
+          && propValPairs.charAt(cursor) == ' ') {
+        ++cursor;
+      }
+    }
+  }
+
+  protected LogRecordType getType() {
+    return type;
+  }
+
+  protected String get(String key) {
+    return content.getProperty(key);
+  }
+
+  protected long getLong(String key) {
+    String val = get(key);
+
+    return Long.parseLong(val);
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.InputStream;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.DeserializationConfig;
+
+/**
+ * {@link Parser} is an interface to the underlining JSON files. To use the
+ * parser, create a Parser object, and Parser.getNextJob() will return next job
+ * in the trace every time it is called.
+ */
+public class Parser implements Closeable {
+  private final JsonParser parser;
+  private final ObjectMapper mapper;
+  private LoggedNetworkTopology topology;
+
+  // DEBUG
+  public Parser() {
+    parser = null;
+    mapper = null;
+  }
+
+  public Parser(java.io.Reader digest) throws IOException {
+    mapper = new ObjectMapper();
+    mapper.configure(
+        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    parser = mapper.getJsonFactory().createJsonParser(digest);
+  }
+
+  public Parser(InputStream stream) throws JsonParseException, IOException {
+    mapper = new ObjectMapper();
+    mapper.configure(
+        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    parser = mapper.getJsonFactory().createJsonParser(stream);
+  }
+
+  public JobStory getNextJob() throws IOException {
+    try {
+      final LoggedJob job = mapper.readValue(parser, LoggedJob.class);
+      return null == job ? null : new ZombieJob(job, topology);
+    } catch (EOFException e) {
+      return null;
+    } catch (JsonProcessingException e) {
+      throw new IOException(e);
+    }
+    // System.out.println(job.getJobID() + ": user-" + job.getUser() + ": "
+    // + job.getMapTasks().size() + "-"
+    // + job.getReduceTasks().size() + "-"
+    // + job.getOtherTasks().size() + ", @ "
+    // + (new Date(job.getLaunchTime())).toString());
+  }
+
+  public LoggedNetworkTopology readTopology(File topologyFile)
+      throws JsonParseException, JsonMappingException, IOException {
+    this.topology = mapper.readValue(topologyFile, LoggedNetworkTopology.class);
+    return this.topology;
+  }
+
+  public LoggedNetworkTopology readTopology(InputStream stream)
+      throws JsonParseException, JsonMappingException, IOException {
+    this.topology = mapper.readValue(stream, LoggedNetworkTopology.class);
+    return this.topology;
+  }
+
+  public void close() throws IOException {
+    parser.close();
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
+
+  private long shuffleTime;
+  private long mergeTime;
+  private long reduceTime;
+
+  public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+      long mergeTime, long reduceTime) {
+    super(state, taskInfo);
+    this.shuffleTime = shuffleTime;
+    this.mergeTime = mergeTime;
+    this.reduceTime = reduceTime;
+  }
+
+  /**
+   * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
+   * 
+   * @return the runtime for the <b>reduce</b> phase of the reduce task-attempt
+   */
+  public long getReduceRuntime() {
+    return reduceTime;
+  }
+
+  /**
+   * Get the runtime for the <b>shuffle</b> phase of the reduce task-attempt.
+   * 
+   * @return the runtime for the <b>shuffle</b> phase of the reduce task-attempt
+   */
+  public long getShuffleRuntime() {
+    return shuffleTime;
+  }
+
+  /**
+   * Get the runtime for the <b>merge</b> phase of the reduce task-attempt
+   * 
+   * @return the runtime for the <b>merge</b> phase of the reduce task-attempt
+   */
+  public long getMergeRuntime() {
+    return mergeTime;
+  }
+
+  @Override
+  public long getRuntime() {
+    // XXX Does this make sense? Should TaskAttemptInfo.getRuntime() return
+    // total reduce runtime, rather than reduce phase time?
+    return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link TaskAttemptInfo} is a collection of statistics about a particular
+ * task-attempt gleaned from job-history of the job.
+ */
+public abstract class TaskAttemptInfo {
+  protected final State state;
+  protected final TaskInfo taskInfo;
+
+  protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+    if (state == State.SUCCEEDED || state == State.FAILED) {
+      this.state = state;
+    } else {
+      throw new IllegalArgumentException("status cannot be " + state);
+    }
+    this.taskInfo = taskInfo;
+  }
+
+  /**
+   * Get the final {@link TaskStatus.State} of the task-attempt.
+   * 
+   * @return the final <code>State</code> of the task-attempt
+   */
+  public State getRunState() {
+    return state;
+  }
+
+  /**
+   * Get the total runtime for the task-attempt.
+   * 
+   * @return the total runtime for the task-attempt
+   */
+  public abstract long getRuntime();
+
+  /**
+   * Get the {@link TaskInfo} for the given task-attempt.
+   * 
+   * @return the <code>TaskInfo</code> for the given task-attempt
+   */
+  public TaskInfo getTaskInfo() {
+    return taskInfo;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class TaskInfo {
+
+  private final long bytesIn;
+  private final int recsIn;
+  private final long bytesOut;
+  private final int recsOut;
+  private final long maxMemory;
+
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, int maxMemory) {
+    this.bytesIn = bytesIn;
+    this.recsIn = recsIn;
+    this.bytesOut = bytesOut;
+    this.recsOut = recsOut;
+    this.maxMemory = maxMemory;
+  }
+
+  /**
+   * @return Raw bytes read from the FileSystem into the task. Note that this
+   *         may not always match the input bytes to the task.
+   */
+  public long getInputBytes() {
+    return bytesIn;
+  }
+
+  /**
+   * @return Number of records input to this task.
+   */
+  public int getInputRecords() {
+    return recsIn;
+  }
+
+  /**
+   * @return Raw bytes written to the destination FileSystem. Note that this may
+   *         not match output bytes.
+   */
+  public long getOutputBytes() {
+    return bytesOut;
+  }
+
+  /**
+   * @return Number of records output from this task.
+   */
+  public int getOutputRecords() {
+    return recsOut;
+  }
+
+  /**
+   * @return Memory used by the task leq the heap size.
+   */
+  public long getTaskMemory() {
+    return maxMemory;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * This describes a path from a node to the root. We use it when we compare two
+ * trees during rumen unit tests. If the trees are not identical, this chain
+ * will be converted to a string which describes the path from the root to the
+ * fields that did not compare.
+ * 
+ */
+public class TreePath {
+  final TreePath parent;
+
+  final String fieldName;
+
+  final int index;
+
+  public TreePath(TreePath parent, String fieldName) {
+    super();
+
+    this.parent = parent;
+    this.fieldName = fieldName;
+    this.index = -1;
+  }
+
+  public TreePath(TreePath parent, String fieldName, int index) {
+    super();
+
+    this.parent = parent;
+    this.fieldName = fieldName;
+    this.index = index;
+  }
+
+  public String toString() {
+    String mySegment = fieldName + (index == -1 ? "" : ("[" + index + "]"));
+
+    return ((parent == null) ? "" : parent.toString() + "-->") + mySegment;
+  }
+}