You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC

svn commit: r1077079 [11/11] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/g...

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * {@link Node} represents a node in the cluster topology. A node can be a
+ * {@MachineNode}, or a {@link RackNode}, etc.
+ */
+public class Node implements Comparable<Node> {
+  private static final SortedSet<Node> EMPTY_SET = 
+    Collections.unmodifiableSortedSet(new TreeSet<Node>());
+  private Node parent;
+  private final String name;
+  private final int level;
+  private SortedSet<Node> children;
+
+  /**
+   * @param name
+   *          A unique name to identify a node in the cluster.
+   * @param level
+   *          The level of the node in the cluster
+   */
+  public Node(String name, int level) {
+    if (name == null) {
+      throw new IllegalArgumentException("Node name cannot be null");
+    }
+
+    if (level < 0) {
+      throw new IllegalArgumentException("Level cannot be negative");
+    }
+
+    this.name = name;
+    this.level = level;
+  }
+
+  /**
+   * Get the name of the node.
+   * 
+   * @return The name of the node.
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Get the level of the node.
+   * @return The level of the node.
+   */
+  public int getLevel() {
+    return level;
+  }
+  
+  private void checkChildren() {
+    if (children == null) {
+      children = new TreeSet<Node>();
+    }
+  }
+
+  /**
+   * Add a child node to this node.
+   * @param child The child node to be added. The child node should currently not be belong to another cluster topology.
+   * @return Boolean indicating whether the node is successfully added.
+   */
+  public synchronized boolean addChild(Node child) {
+    if (child.parent != null) {
+      throw new IllegalArgumentException(
+          "The child is already under another node:" + child.parent);
+    }
+    checkChildren();
+    boolean retval = children.add(child);
+    if (retval) child.parent = this;
+    return retval;
+  }
+
+  /**
+   * Does this node have any children?
+   * @return Boolean indicate whether this node has any children.
+   */
+  public synchronized boolean hasChildren() {
+    return children != null && !children.isEmpty();
+  }
+
+  /**
+   * Get the children of this node.
+   * 
+   * @return The children of this node. If no child, an empty set will be
+   *         returned. The returned set is read-only.
+   */
+  public synchronized Set<Node> getChildren() {
+    return (children == null) ? EMPTY_SET : 
+      Collections.unmodifiableSortedSet(children);
+  }
+  
+  /**
+   * Get the parent node.
+   * @return the parent node. If root node, return null.
+   */
+  public Node getParent() {
+    return parent;
+  }
+  
+  @Override
+  public int hashCode() {
+    return name.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (obj.getClass() != this.getClass())
+      return false;
+    Node other = (Node) obj;
+    return name.equals(other.name);
+  }
+  
+  @Override
+  public String toString() {
+    return "(" + name +", " + level +")";
+  }
+
+  @Override
+  public int compareTo(Node o) {
+    return name.compareTo(o.name);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+class Pair<CarType, CdrType> {
+  private final CarType car;
+  private final CdrType cdr;
+
+  Pair(CarType car, CdrType cdr) {
+    super();
+
+    this.car = car;
+    this.cdr = cdr;
+  }
+
+  CarType first() {
+    return car;
+  }
+
+  CdrType second() {
+    return cdr;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Element;
+import org.w3c.dom.Text;
+
+import org.xml.sax.SAXException;
+
+class ParsedConfigFile {
+  private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+  private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+
+  final int heapMegabytes;
+
+  final String queue;
+  final String jobName;
+
+  final int clusterMapMB;
+  final int clusterReduceMB;
+  final int jobMapMB;
+  final int jobReduceMB;
+
+  final String jobID;
+
+  final boolean valid;
+
+  private int maybeGetIntValue(String propName, String attr, String value,
+      int oldValue) {
+    if (propName.equals(attr) && value != null) {
+      try {
+        return Integer.parseInt(value);
+      } catch (NumberFormatException e) {
+        return oldValue;
+      }
+    }
+
+    return oldValue;
+  }
+
+  @SuppressWarnings("hiding")
+  ParsedConfigFile(String filenameLine, String xmlString) {
+    super();
+
+    int heapMegabytes = -1;
+
+    String queue = null;
+    String jobName = null;
+
+    int clusterMapMB = -1;
+    int clusterReduceMB = -1;
+    int jobMapMB = -1;
+    int jobReduceMB = -1;
+
+    String jobID = null;
+
+    boolean valid = true;
+
+    Matcher jobIDMatcher = jobIDPattern.matcher(filenameLine);
+
+    if (jobIDMatcher.find()) {
+      jobID = jobIDMatcher.group(1);
+    }
+
+    try {
+      InputStream is = new ByteArrayInputStream(xmlString.getBytes());
+
+      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+      DocumentBuilder db = dbf.newDocumentBuilder();
+
+      Document doc = db.parse(is);
+
+      Element root = doc.getDocumentElement();
+
+      if (!"configuration".equals(root.getTagName())) {
+        System.out.print("root is not a configuration node");
+        valid = false;
+      }
+
+      NodeList props = root.getChildNodes();
+
+      for (int i = 0; i < props.getLength(); ++i) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element))
+          continue;
+        Element prop = (Element) propNode;
+        if (!"property".equals(prop.getTagName())) {
+          System.out.print("bad conf file: element not <property>");
+        }
+        NodeList fields = prop.getChildNodes();
+        String attr = null;
+        String value = null;
+        @SuppressWarnings("unused")
+        boolean finalParameter = false;
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element)) {
+            continue;
+          }
+
+          Element field = (Element) fieldNode;
+          if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+            attr = ((Text) field.getFirstChild()).getData().trim();
+          }
+          if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+            value = ((Text) field.getFirstChild()).getData();
+          }
+          if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+            finalParameter = "true".equals(((Text) field.getFirstChild())
+                .getData());
+          }
+        }
+        if ("mapred.child.java.opts".equals(attr) && value != null) {
+          Matcher matcher = heapPattern.matcher(value);
+          if (matcher.find()) {
+            String heapSize = matcher.group(1);
+
+            heapMegabytes = Integer.parseInt(heapSize);
+
+            if (matcher.group(2).equalsIgnoreCase("G")) {
+              heapMegabytes *= 1024;
+            }
+          }
+        }
+
+        if ("mapred.job.queue.name".equals(attr) && value != null) {
+          queue = value;
+        }
+
+        if ("mapred.job.name".equals(attr) && value != null) {
+          jobName = value;
+        }
+
+        clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
+            value, clusterMapMB);
+        clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
+            attr, value, clusterReduceMB);
+        jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
+            jobMapMB);
+        jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
+            value, jobReduceMB);
+      }
+
+      valid = true;
+    } catch (ParserConfigurationException e) {
+      valid = false;
+    } catch (SAXException e) {
+      valid = false;
+    } catch (IOException e) {
+      valid = false;
+    }
+
+    this.heapMegabytes = heapMegabytes;
+
+    this.queue = queue;
+    this.jobName = jobName;
+
+    this.clusterMapMB = clusterMapMB;
+    this.clusterReduceMB = clusterReduceMB;
+    this.jobMapMB = jobMapMB;
+    this.jobReduceMB = jobReduceMB;
+
+    this.jobID = jobID;
+
+    this.valid = valid;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+class ParsedHost {
+  private final String rackName;
+  private final String nodeName;
+
+  /**
+   * TODO the following only works for /rack/host format. Change to support
+   * arbitrary level of network names.
+   */
+  private static final Pattern splitPattern = Pattern
+      .compile("/([^/]+)/([^/]+)");
+
+  /**
+   * TODO handle arbitrary level of network names.
+   */
+  static int numberOfDistances() {
+    return 3;
+  }
+
+  String nameComponent(int i) throws IllegalArgumentException {
+    switch (i) {
+    case 0:
+      return rackName;
+
+    case 1:
+      return nodeName;
+
+    default:
+      throw new IllegalArgumentException(
+          "Host location component index out of range.");
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return rackName.hashCode() * 17 + nodeName.hashCode();
+  }
+
+  public static ParsedHost parse(String name) {
+    // separate out the node name
+    Matcher matcher = splitPattern.matcher(name);
+
+    if (!matcher.matches())
+      return null;
+
+    return new ParsedHost(matcher.group(1), matcher.group(2));
+  }
+
+  public ParsedHost(LoggedLocation loc) {
+    List<String> coordinates = loc.getLayers();
+
+    rackName = coordinates.get(0);
+    nodeName = coordinates.get(1);
+  }
+
+  LoggedLocation makeLoggedLocation() {
+    LoggedLocation result = new LoggedLocation();
+
+    List<String> coordinates = new ArrayList<String>();
+
+    coordinates.add(rackName);
+    coordinates.add(nodeName);
+
+    result.setLayers(coordinates);
+
+    return result;
+  }
+  
+  String getNodeName() {
+    return nodeName;
+  }
+  
+  String getRackName() {
+    return rackName;
+  }
+
+  // expects the broadest name first
+  ParsedHost(String rackName, String nodeName) {
+    this.rackName = rackName;
+    this.nodeName = nodeName;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof ParsedHost)) {
+      return false;
+    }
+    ParsedHost host = (ParsedHost) other;
+    return (nodeName.equals(host.nodeName) && rackName.equals(host.rackName));
+  }
+
+  int distance(ParsedHost other) {
+    if (nodeName.equals(other.nodeName)) {
+      return 0;
+    }
+
+    if (rackName.equals(other.rackName)) {
+      return 1;
+    }
+
+    return 2;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+class ParsedLine {
+  Properties content;
+  LogRecordType type;
+
+  static final Pattern keyValPair = Pattern
+      .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+
+  @SuppressWarnings("unused") 
+  ParsedLine(String fullLine, int version) {
+    super();
+
+    content = new Properties();
+
+    int firstSpace = fullLine.indexOf(" ");
+
+    if (firstSpace < 0) {
+      firstSpace = fullLine.length();
+    }
+
+    if (firstSpace == 0) {
+      return; // This is a junk line of some sort
+    }
+
+    type = LogRecordType.intern(fullLine.substring(0, firstSpace));
+
+    String propValPairs = fullLine.substring(firstSpace + 1);
+
+    while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
+      propValPairs = propValPairs.substring(1);
+    }
+
+    int cursor = 0;
+
+    while (cursor < propValPairs.length()) {
+      int equals = propValPairs.indexOf('=', cursor);
+
+      if (equals < 0) {
+        // maybe we do some error processing
+        return;
+      }
+
+      int nextCursor;
+
+      int endValue;
+
+      if (propValPairs.charAt(equals + 1) == '\"') {
+        int closeQuote = propValPairs.indexOf('\"', equals + 2);
+
+        nextCursor = closeQuote + 1;
+
+        endValue = closeQuote;
+
+        if (closeQuote < 0) {
+          endValue = propValPairs.length();
+
+          nextCursor = endValue;
+        }
+      } else {
+        int closeSpace = propValPairs.indexOf(' ', equals + 1);
+
+        if (closeSpace < 0) {
+          closeSpace = propValPairs.length();
+        }
+
+        endValue = closeSpace;
+
+        nextCursor = endValue;
+      }
+
+      content.setProperty(propValPairs.substring(cursor, equals), propValPairs
+          .substring(equals + 2, endValue));
+
+      cursor = nextCursor;
+
+      while (cursor < propValPairs.length()
+          && propValPairs.charAt(cursor) == ' ') {
+        ++cursor;
+      }
+    }
+  }
+
+  protected LogRecordType getType() {
+    return type;
+  }
+
+  protected String get(String key) {
+    return content.getProperty(key);
+  }
+
+  protected long getLong(String key) {
+    String val = get(key);
+
+    return Long.parseLong(val);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * 
+ *
+ */
+public class Pre21JobHistoryConstants {
+  
+  /**
+   * Job history files contain key="value" pairs, where keys belong to this enum. 
+   * It acts as a global namespace for all keys. 
+   */
+  static enum Keys {
+    JOBTRACKERID,
+    START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+    LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+    FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+    ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+    SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+    JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
+  }
+
+  /**
+   * This enum contains some of the values commonly used by history log events. 
+   * since values in history can only be strings - Values.name() is used in 
+   * most places in history file. 
+   */
+  public static enum Values {
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
+  }
+ 
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+
+/**
+ * {@link RackNode} represents a rack node in the cluster topology.
+ */
+public final class RackNode extends Node {
+  public RackNode(String name, int level) {
+    // Hack: ensuring rack name starts with "/".
+    super(name.startsWith("/") ? name : "/" + name, level);
+  }
+  
+  @Override
+  public synchronized boolean addChild(Node child) {
+    if (!(child instanceof MachineNode)) {
+      throw new IllegalArgumentException(
+          "Only MachineNode can be added to RackNode");
+    }
+    return super.addChild(child);
+  }
+  
+  /**
+   * Get the machine nodes that belong to the rack.
+   * @return The machine nodes that belong to the rack.
+   */
+  @SuppressWarnings({ "cast", "unchecked" })
+  public Set<MachineNode> getMachinesInRack() {
+    return (Set<MachineNode>)(Set)getChildren();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link ReduceTaskAttemptInfo} represents the information with regard to a
+ * reduce task attempt.
+ */
+public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
+  private long shuffleTime;
+  private long mergeTime;
+  private long reduceTime;
+
+  public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+      long mergeTime, long reduceTime) {
+    super(state, taskInfo);
+    this.shuffleTime = shuffleTime;
+    this.mergeTime = mergeTime;
+    this.reduceTime = reduceTime;
+  }
+
+  /**
+   * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
+   * 
+   * @return the runtime for the <b>reduce</b> phase of the reduce task-attempt
+   */
+  public long getReduceRuntime() {
+    return reduceTime;
+  }
+
+  /**
+   * Get the runtime for the <b>shuffle</b> phase of the reduce task-attempt.
+   * 
+   * @return the runtime for the <b>shuffle</b> phase of the reduce task-attempt
+   */
+  public long getShuffleRuntime() {
+    return shuffleTime;
+  }
+
+  /**
+   * Get the runtime for the <b>merge</b> phase of the reduce task-attempt
+   * 
+   * @return the runtime for the <b>merge</b> phase of the reduce task-attempt
+   */
+  public long getMergeRuntime() {
+    return mergeTime;
+  }
+
+  @Override
+  public long getRuntime() {
+    return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link TaskAttemptInfo} is a collection of statistics about a particular
+ * task-attempt gleaned from job-history of the job.
+ */
+public abstract class TaskAttemptInfo {
+  protected final State state;
+  protected final TaskInfo taskInfo;
+
+  protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+    if (state == State.SUCCEEDED || state == State.FAILED) {
+      this.state = state;
+    } else {
+      throw new IllegalArgumentException("status cannot be " + state);
+    }
+    this.taskInfo = taskInfo;
+  }
+
+  /**
+   * Get the final {@link TaskStatus.State} of the task-attempt.
+   * 
+   * @return the final <code>State</code> of the task-attempt
+   */
+  public State getRunState() {
+    return state;
+  }
+
+  /**
+   * Get the total runtime for the task-attempt.
+   * 
+   * @return the total runtime for the task-attempt
+   */
+  public abstract long getRuntime();
+
+  /**
+   * Get the {@link TaskInfo} for the given task-attempt.
+   * 
+   * @return the <code>TaskInfo</code> for the given task-attempt
+   */
+  public TaskInfo getTaskInfo() {
+    return taskInfo;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class TaskInfo {
+  private final long bytesIn;
+  private final int recsIn;
+  private final long bytesOut;
+  private final int recsOut;
+  private final long maxMemory;
+
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+      long maxMemory) {
+    this.bytesIn = bytesIn;
+    this.recsIn = recsIn;
+    this.bytesOut = bytesOut;
+    this.recsOut = recsOut;
+    this.maxMemory = maxMemory;
+  }
+
+  /**
+   * @return Raw bytes read from the FileSystem into the task. Note that this
+   *         may not always match the input bytes to the task.
+   */
+  public long getInputBytes() {
+    return bytesIn;
+  }
+
+  /**
+   * @return Number of records input to this task.
+   */
+  public int getInputRecords() {
+    return recsIn;
+  }
+
+  /**
+   * @return Raw bytes written to the destination FileSystem. Note that this may
+   *         not match output bytes.
+   */
+  public long getOutputBytes() {
+    return bytesOut;
+  }
+
+  /**
+   * @return Number of records output from this task.
+   */
+  public int getOutputRecords() {
+    return recsOut;
+  }
+
+  /**
+   * @return Memory used by the task leq the heap size.
+   */
+  public long getTaskMemory() {
+    return maxMemory;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * This describes a path from a node to the root. We use it when we compare two
+ * trees during rumen unit tests. If the trees are not identical, this chain
+ * will be converted to a string which describes the path from the root to the
+ * fields that did not compare.
+ * 
+ */
+public class TreePath {
+  final TreePath parent;
+
+  final String fieldName;
+
+  final int index;
+
+  public TreePath(TreePath parent, String fieldName) {
+    super();
+
+    this.parent = parent;
+    this.fieldName = fieldName;
+    this.index = -1;
+  }
+
+  public TreePath(TreePath parent, String fieldName, int index) {
+    super();
+
+    this.parent = parent;
+    this.fieldName = fieldName;
+    this.index = index;
+  }
+
+  @Override
+  public String toString() {
+    String mySegment = fieldName + (index == -1 ? "" : ("[" + index + "]"));
+
+    return ((parent == null) ? "" : parent.toString() + "-->") + mySegment;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link ZombieCluster} rebuilds the cluster topology using the information
+ * obtained from job history logs.
+ */
+public class ZombieCluster extends AbstractClusterStory {
+  private Node root;
+
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param topology
+   *          The network topology.
+   * @param defaultNode
+   *          The default node setting.
+   */
+  ZombieCluster(LoggedNetworkTopology topology, MachineNode defaultNode) {
+    buildCluster(topology, defaultNode);
+  }
+
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param path Path to the JSON-encoded topology file.
+   * @param conf
+   * @param defaultNode
+   *          The default node setting.
+   * @throws IOException 
+   */
+  public ZombieCluster(Path path, MachineNode defaultNode, Configuration conf) throws IOException {
+    this(new ClusterTopologyReader(path, conf).get(), defaultNode);
+  }
+  
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param input The input stream for the JSON-encoded topology file.
+   * @param defaultNode
+   *          The default node setting.
+   * @throws IOException 
+   */
+  public ZombieCluster(InputStream input, MachineNode defaultNode) throws IOException {
+    this(new ClusterTopologyReader(input).get(), defaultNode);
+  }
+
+  @Override
+  public Node getClusterTopology() {
+    return root;
+  }
+
+  private final void buildCluster(LoggedNetworkTopology topology,
+      MachineNode defaultNode) {
+    Map<LoggedNetworkTopology, Integer> levelMapping = 
+      new IdentityHashMap<LoggedNetworkTopology, Integer>();
+    Deque<LoggedNetworkTopology> unvisited = 
+      new ArrayDeque<LoggedNetworkTopology>();
+    unvisited.add(topology);
+    levelMapping.put(topology, 0);
+    
+    // building levelMapping and determine leafLevel
+    int leafLevel = -1; // -1 means leafLevel unknown.
+    for (LoggedNetworkTopology n = unvisited.poll(); n != null; 
+      n = unvisited.poll()) {
+      int level = levelMapping.get(n);
+      List<LoggedNetworkTopology> children = n.getChildren();
+      if (children == null || children.isEmpty()) {
+        if (leafLevel == -1) {
+          leafLevel = level;
+        } else if (leafLevel != level) {
+          throw new IllegalArgumentException(
+              "Leaf nodes are not on the same level");
+        }
+      } else {
+        for (LoggedNetworkTopology child : children) {
+          levelMapping.put(child, level + 1);
+          unvisited.addFirst(child);
+        }
+      }
+    }
+
+    /**
+     * A second-pass dfs traverse of topology tree. path[i] contains the parent
+     * of the node at level i+1.
+     */
+    Node[] path = new Node[leafLevel];
+    unvisited.add(topology);
+    for (LoggedNetworkTopology n = unvisited.poll(); n != null; 
+      n = unvisited.poll()) {
+      int level = levelMapping.get(n);
+      Node current;
+      if (level == leafLevel) { // a machine node
+        MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
+        if (defaultNode != null) {
+          builder.cloneFrom(defaultNode);
+        }
+        current = builder.build();
+      } else {
+        current = (level == leafLevel - 1) 
+          ? new RackNode(n.getName(), level) : 
+            new Node(n.getName(), level);
+        path[level] = current;
+        // Add all children to the front of the queue.
+        for (LoggedNetworkTopology child : n.getChildren()) {
+          unvisited.addFirst(child);
+        }
+      }
+      if (level != 0) {
+        path[level - 1].addChild(current);
+      }
+    }
+
+    root = path[0];
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,880 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+
+/**
+ * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
+ * 
+ * Each {@link ZombieJob} object represents a job in job history. For everything
+ * that exists in job history, contents are returned unchanged faithfully. To
+ * get input splits of a non-exist task, a non-exist task attempt, or an
+ * ill-formed task attempt, proper objects are made up from statistical
+ * sketches.
+ */
+@SuppressWarnings("deprecation")
+public class ZombieJob implements JobStory {
+  static final Log LOG = LogFactory.getLog(ZombieJob.class);
+  private final LoggedJob job;
+  private Map<TaskID, LoggedTask> loggedTaskMap;
+  private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap;
+  private final Random random;
+  private InputSplit[] splits;
+  private final ClusterStory cluster;
+  private JobConf jobConf;
+
+  private long seed;
+  private boolean hasRandomSeed = false;
+
+  private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
+      new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();
+
+  // TODO: Fix ZombieJob to initialize this correctly from observed data
+  double rackLocalOverNodeLocal = 1.5;
+  double rackRemoteOverNodeLocal = 3.0;
+
+  /**
+   * This constructor creates a {@link ZombieJob} with the same semantics as the
+   * {@link LoggedJob} passed in this parameter
+   * 
+   * @param job
+   *          The dead job this ZombieJob instance is based on.
+   * @param cluster
+   *          The cluster topology where the dead job ran on. This argument can
+   *          be null if we do not have knowledge of the cluster topology.
+   * @param seed
+   *          Seed for the random number generator for filling in information
+   *          not available from the ZombieJob.
+   */
+  public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) {
+    if (job == null) {
+      throw new IllegalArgumentException("job is null");
+    }
+    this.job = job;
+    this.cluster = cluster;
+    random = new Random(seed);
+    this.seed = seed;
+    hasRandomSeed = true;
+  }
+
+  /**
+   * This constructor creates a {@link ZombieJob} with the same semantics as the
+   * {@link LoggedJob} passed in this parameter
+   * 
+   * @param job
+   *          The dead job this ZombieJob instance is based on.
+   * @param cluster
+   *          The cluster topology where the dead job ran on. This argument can
+   *          be null if we do not have knowledge of the cluster topology.
+   */
+  public ZombieJob(LoggedJob job, ClusterStory cluster) {
+    this(job, cluster, System.nanoTime());
+  }
+
+  private static State convertState(Values status) {
+    if (status == Values.SUCCESS) {
+      return State.SUCCEEDED;
+    } else if (status == Values.FAILED) {
+      return State.FAILED;
+    } else if (status == Values.KILLED) {
+      return State.KILLED;
+    } else {
+      throw new IllegalArgumentException("unknown status " + status);
+    }
+  }
+
+  @Override
+  public synchronized JobConf getJobConf() {
+    if (jobConf == null) {
+      // TODO : add more to jobConf ?
+      jobConf = new JobConf();
+      jobConf.setJobName(getName());
+      jobConf.setUser(getUser());
+      jobConf.setNumMapTasks(getNumberMaps());
+      jobConf.setNumReduceTasks(getNumberReduces());
+    }
+    return jobConf;
+  }
+  
+  @Override
+  public InputSplit[] getInputSplits() {
+    if (splits == null) {
+      List<InputSplit> splitsList = new ArrayList<InputSplit>();
+      Path emptyPath = new Path("/");
+      int totalHosts = 0; // use to determine avg # of hosts per split.
+      for (LoggedTask mapTask : job.getMapTasks()) {
+        Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType();
+        if (taskType != Pre21JobHistoryConstants.Values.MAP) {
+          LOG.warn("TaskType for a MapTask is not Map. task="
+              + mapTask.getTaskID() + " type="
+              + ((taskType == null) ? "null" : taskType.toString()));
+          continue;
+        }
+        List<LoggedLocation> locations = mapTask.getPreferredLocations();
+        List<String> hostList = new ArrayList<String>();
+        if (locations != null) {
+          for (LoggedLocation location : locations) {
+            List<String> layers = location.getLayers();
+            if (layers.size() == 0) {
+              LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
+              continue;
+            }
+            String host = layers.get(layers.size() - 1);
+            if (host == null) {
+              LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
+              continue;
+            }
+            hostList.add(host);
+          }
+        }
+        String[] hosts = hostList.toArray(new String[hostList.size()]);
+        totalHosts += hosts.length;
+        long mapInputBytes = getTaskInfo(mapTask).getInputBytes();
+        if (mapInputBytes < 0) {
+          LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined.");
+          mapInputBytes = 0;
+        }
+       
+        splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts));
+      }
+
+      // If not all map tasks are in job trace, should make up some splits
+      // for missing map tasks.
+      int totalMaps = job.getTotalMaps();
+      if (totalMaps < splitsList.size()) {
+        LOG.warn("TotalMaps for job " + job.getJobID()
+            + " is less than the total number of map task descriptions ("
+            + totalMaps + "<" + splitsList.size() + ").");
+      }
+
+      int avgHostPerSplit;
+      if (splitsList.size() == 0) {
+        avgHostPerSplit = 3;
+      } else {
+        avgHostPerSplit = totalHosts / splitsList.size();
+        if (avgHostPerSplit == 0) {
+          avgHostPerSplit = 3;
+        }
+      }
+
+      for (int i = splitsList.size(); i < totalMaps; i++) {
+        if (cluster == null) {
+          splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
+        } else {
+          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
+          String[] hosts = new String[mNodes.length];
+          for (int j = 0; j < hosts.length; ++j) {
+            hosts[j] = mNodes[j].getName();
+          }
+          // TODO set size of a split to 0 now.
+          splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
+        }
+      }
+
+      splits = splitsList.toArray(new InputSplit[splitsList.size()]);
+    }
+    return splits;
+  }
+
+  @Override
+  public String getName() {
+    String jobName = job.getJobName();
+    if (jobName == null) {
+      return "(name unknown)";
+    } else {
+      return jobName;
+    }
+  }
+
+  @Override
+  public JobID getJobID() {
+    return JobID.forName(getLoggedJob().getJobID());
+  }
+
+  private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
+    if (oldVal == -1) {
+      LOG.warn(name +" not defined for "+id);
+      return defaultVal;
+    }
+    return oldVal;
+  }
+  
+  @Override
+  public int getNumberMaps() {
+    return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID());
+  }
+
+  @Override
+  public int getNumberReduces() {
+    return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID());
+  }
+
+  @Override
+  public Values getOutcome() {
+    return job.getOutcome();
+  }
+
+  @Override
+  public long getSubmissionTime() {
+    return job.getSubmitTime() - job.getRelativeTime();
+  }
+
+  /**
+   * Getting the number of map tasks that are actually logged in the trace.
+   * @return The number of map tasks that are actually logged in the trace.
+   */
+  public int getNumLoggedMaps() {
+    return job.getMapTasks().size();
+  }
+
+
+  /**
+   * Getting the number of reduce tasks that are actually logged in the trace.
+   * @return The number of map tasks that are actually logged in the trace.
+   */
+  public int getNumLoggedReduces() {
+    return job.getReduceTasks().size();
+  }
+  
+  /**
+   * Mask the job ID part in a {@link TaskID}.
+   * 
+   * @param taskId
+   *          raw {@link TaskID} read from trace
+   * @return masked {@link TaskID} with empty {@link JobID}.
+   */
+  private TaskID maskTaskID(TaskID taskId) {
+    JobID jobId = new JobID();
+    return new TaskID(jobId, taskId.isMap(), taskId.getId());
+  }
+
+  /**
+   * Mask the job ID part in a {@link TaskAttemptID}.
+   * 
+   * @param attemptId
+   *          raw {@link TaskAttemptID} read from trace
+   * @return masked {@link TaskAttemptID} with empty {@link JobID}.
+   */
+  private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
+    JobID jobId = new JobID();
+    TaskID taskId = attemptId.getTaskID();
+    return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(),
+        attemptId.isMap(), taskId.getId(), attemptId.getId());
+  }
+
+  private LoggedTask sanitizeLoggedTask(LoggedTask task) {
+    if (task == null) {
+      return null;
+    }
+    if (task.getTaskType() == null) {
+      LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
+      return null;
+    }
+    if (task.getTaskStatus() == null) {
+      LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus");
+      return null;
+    }
+    return task;
+  }
+
+  private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
+    if (attempt == null) {
+      return null;
+    }
+    if (attempt.getResult() == null) {
+      LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
+      return null;
+    }
+
+    return attempt;
+  }
+
+  /**
+   * Build task mapping and task attempt mapping, to be later used to find
+   * information of a particular {@link TaskID} or {@link TaskAttemptID}.
+   */
+  private synchronized void buildMaps() {
+    if (loggedTaskMap == null) {
+      loggedTaskMap = new HashMap<TaskID, LoggedTask>();
+      loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
+      
+      for (LoggedTask map : job.getMapTasks()) {
+        map = sanitizeLoggedTask(map);
+        if (map != null) {
+          loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+
+          for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
+            mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
+            if (mapAttempt != null) {
+              TaskAttemptID id = TaskAttemptID.forName(mapAttempt
+                  .getAttemptID());
+              loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
+            }
+          }
+        }
+      }
+      for (LoggedTask reduce : job.getReduceTasks()) {
+        reduce = sanitizeLoggedTask(reduce);
+        if (reduce != null) {
+          loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+
+          for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
+            reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
+            if (reduceAttempt != null) {
+              TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
+                  .getAttemptID());
+              loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
+            }
+          }
+        }
+      }
+
+      // TODO: do not care about "other" tasks, "setup" or "clean"
+    }
+  }
+
+  @Override
+  public String getUser() {
+    String retval = job.getUser();
+    return (retval==null)?"(unknown)":retval;
+  }
+
+  /**
+   * Get the underlining {@link LoggedJob} object read directly from the trace.
+   * This is mainly for debugging.
+   * 
+   * @return the underlining {@link LoggedJob} object
+   */
+  public LoggedJob getLoggedJob() {
+    return job;
+  }
+
+  /**
+   * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+   * taskType, taskNumber, and taskAttemptNumber. This function does not care
+   * about locality, and follows the following decision logic: 1. Make up a
+   * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+   * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+   * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the
+   * {@link TaskAttemptInfo} from the trace.
+   */
+  public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    // does not care about locality. assume default locality is NODE_LOCAL.
+    // But if both task and task attempt exist in trace, use logged locality.
+    int locality = 0;
+    LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+    if (loggedTask == null) {
+      // TODO insert parameters
+      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
+    }
+
+    LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+        taskNumber, taskAttemptNumber);
+    if (loggedAttempt == null) {
+      // Task exists, but attempt is missing.
+      TaskInfo taskInfo = getTaskInfo(loggedTask);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
+    } else {
+      // TODO should we handle killed attempts later?
+      if (loggedAttempt.getResult()== Values.KILLED) {
+        TaskInfo taskInfo = getTaskInfo(loggedTask);
+        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+            taskNumber, locality);
+      } else {
+        return getTaskAttemptInfo(loggedTask, loggedAttempt);
+      }
+    }
+  }
+
+  @Override
+  public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+    return getTaskInfo(getLoggedTask(taskType, taskNumber));
+  }
+
+  /**
+   * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+   * taskType, taskNumber, and taskAttemptNumber. This function considers
+   * locality, and follows the following decision logic: 1. Make up a
+   * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+   * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+   * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo}
+   * from the trace, without considering locality. 4. If final state is
+   * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime
+   * scaled according to locality in simulation and locality in trace.
+   */
+  @Override
+  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+      int taskAttemptNumber, int locality) {
+    TaskType taskType = TaskType.MAP;
+    LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+    if (loggedTask == null) {
+      // TODO insert parameters
+      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
+    }
+    LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+        taskNumber, taskAttemptNumber);
+    if (loggedAttempt == null) {
+      // Task exists, but attempt is missing.
+      TaskInfo taskInfo = getTaskInfo(loggedTask);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
+    } else {
+      // Task and TaskAttempt both exist.
+      if (loggedAttempt.getResult() == Values.KILLED) {
+        TaskInfo taskInfo = getTaskInfo(loggedTask);
+        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+            taskNumber, locality);
+      } else if (loggedAttempt.getResult() == Values.FAILED) {
+        /**
+         * FAILED attempt is not affected by locality however, made-up FAILED
+         * attempts ARE affected by locality, since statistics are present for
+         * attempts of different locality.
+         */
+        return getTaskAttemptInfo(loggedTask, loggedAttempt);
+      } else if (loggedAttempt.getResult() == Values.SUCCESS) {
+        int loggedLocality = getLocality(loggedTask, loggedAttempt);
+        if (locality == loggedLocality) {
+          return getTaskAttemptInfo(loggedTask, loggedAttempt);
+        } else {
+          // attempt succeeded in trace. It is scheduled in simulation with
+          // a different locality.
+          return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality,
+              rackLocalOverNodeLocal, rackRemoteOverNodeLocal);
+        }
+      } else {
+        throw new IllegalArgumentException(
+            "attempt result is not SUCCEEDED, FAILED or KILLED: "
+                + loggedAttempt.getResult());
+      }
+    }
+  }
+
+  private long sanitizeTaskRuntime(long time, String id) {
+    if (time < 0) {
+      LOG.warn("Negative running time for task "+id+": "+time);
+      return 100L; // set default to 100ms.
+    }
+    return time;
+  }
+  
+  @SuppressWarnings("hiding") 
+  private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
+      LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
+      double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
+    TaskInfo taskInfo = getTaskInfo(loggedTask);
+    double[] factors = new double[] { 1.0, rackLocalOverNodeLocal,
+        rackRemoteOverNodeLocal };
+    double scaleFactor = factors[locality] / factors[loggedLocality];
+    State state = convertState(loggedAttempt.getResult());
+    if (loggedTask.getTaskType() == Values.MAP) {
+      long taskTime = 0;
+      if (loggedAttempt.getStartTime() == 0) {
+        taskTime = makeUpMapRuntime(state, locality);
+      } else {
+        taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+      }
+      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
+      taskTime *= scaleFactor;
+      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+    } else {
+      throw new IllegalArgumentException("taskType can only be MAP: "
+          + loggedTask.getTaskType());
+    }
+  }
+
+  private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
+    int distance = cluster.getMaximumDistance();
+    String rackHostName = loggedAttempt.getHostName();
+    if (rackHostName == null) {
+      return distance;
+    }
+    MachineNode mn = getMachineNode(rackHostName);
+    if (mn == null) {
+      return distance;
+    }
+    List<LoggedLocation> locations = loggedTask.getPreferredLocations();
+    if (locations != null) {
+      for (LoggedLocation location : locations) {
+        List<String> layers = location.getLayers();
+        if ((layers == null) || (layers.isEmpty())) {
+          continue;
+        }
+        String dataNodeName = layers.get(layers.size()-1);
+        MachineNode dataNode = cluster.getMachineByName(dataNodeName);
+        if (dataNode != null) {
+          distance = Math.min(distance, cluster.distance(mn, dataNode));
+        }
+      }
+    }
+    return distance;
+  }
+
+  private MachineNode getMachineNode(String rackHostName) {
+    ParsedHost parsedHost = ParsedHost.parse(rackHostName);
+    String hostName = (parsedHost == null) ? rackHostName 
+                                           : parsedHost.getNodeName();
+    if (hostName == null) {
+      return null;
+    }
+    return (cluster == null) ? null : cluster.getMachineByName(hostName);
+  }
+
+  private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
+      LoggedTaskAttempt loggedAttempt) {
+    TaskInfo taskInfo = getTaskInfo(loggedTask);
+    State state = convertState(loggedAttempt.getResult());
+    if (loggedTask.getTaskType() == Values.MAP) {
+      long taskTime;
+      if (loggedAttempt.getStartTime() == 0) {
+        int locality = getLocality(loggedTask, loggedAttempt);
+        taskTime = makeUpMapRuntime(state, locality);
+      } else {
+        taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+      }
+      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
+      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+    } else if (loggedTask.getTaskType() == Values.REDUCE) {
+      long startTime = loggedAttempt.getStartTime();
+      long mergeDone = loggedAttempt.getSortFinished();
+      long shuffleDone = loggedAttempt.getShuffleFinished();
+      long finishTime = loggedAttempt.getFinishTime();
+      if (startTime <= 0 || startTime >= finishTime) {
+        // have seen startTime>finishTime.
+        // haven't seen reduce task with startTime=0 ever. But if this happens,
+        // make up a reduceTime with no shuffle/merge.
+        long reduceTime = makeUpReduceRuntime(state);
+        return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+      } else {
+        if (shuffleDone <= 0) {
+          shuffleDone = startTime;
+        }
+        if (mergeDone <= 0) {
+          mergeDone = finishTime;
+        }
+        long shuffleTime = shuffleDone - startTime;
+        long mergeTime = mergeDone - shuffleDone;
+        long reduceTime = finishTime - mergeDone;
+        reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
+        
+        return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
+            mergeTime, reduceTime);
+      }
+    } else {
+      throw new IllegalArgumentException("taskType for "
+          + loggedTask.getTaskID() + " is neither MAP nor REDUCE: "
+          + loggedTask.getTaskType());
+    }
+  }
+
+  private TaskInfo getTaskInfo(LoggedTask loggedTask) {
+    List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
+
+    long inputBytes = -1;
+    long inputRecords = -1;
+    long outputBytes = -1;
+    long outputRecords = -1;
+    long heapMegabytes = -1;
+
+    Values type = loggedTask.getTaskType();
+    if ((type != Values.MAP) && (type != Values.REDUCE)) {
+      throw new IllegalArgumentException(
+          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
+              + " for task = " + loggedTask.getTaskID());
+    }
+
+    for (LoggedTaskAttempt attempt : attempts) {
+      attempt = sanitizeLoggedTaskAttempt(attempt);
+      // ignore bad attempts or unsuccessful attempts.
+      if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
+        continue;
+      }
+
+      if (type == Values.MAP) {
+        inputBytes = attempt.getHdfsBytesRead();
+        inputRecords = attempt.getMapInputRecords();
+        outputBytes =
+            (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
+                .getHdfsBytesWritten();
+        outputRecords = attempt.getMapOutputRecords();
+        heapMegabytes =
+            (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
+                .getHeapMegabytes();
+      } else {
+        inputBytes = attempt.getReduceShuffleBytes();
+        inputRecords = attempt.getReduceInputRecords();
+        outputBytes = attempt.getHdfsBytesWritten();
+        outputRecords = attempt.getReduceOutputRecords();
+        heapMegabytes =
+            (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
+                .getHeapMegabytes();
+      }
+      break;
+    }
+
+    TaskInfo taskInfo =
+        new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
+            (int) outputRecords, (int) heapMegabytes);
+    return taskInfo;
+  }
+
+  private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
+        TaskType.MAP == taskType, taskNumber), taskAttemptNumber);
+  }
+  
+  private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
+      int taskAttemptNumber, int taskNumber, int locality) {
+    if (taskType == TaskType.MAP) {
+      State state = State.SUCCEEDED;
+      long runtime = 0;
+
+      // make up state
+      state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
+      runtime = makeUpMapRuntime(state, locality);
+      runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
+          taskNumber, taskAttemptNumber).toString());
+      TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+      return tai;
+    } else if (taskType == TaskType.REDUCE) {
+      State state = State.SUCCEEDED;
+      long shuffleTime = 0;
+      long sortTime = 0;
+      long reduceTime = 0;
+
+      // TODO make up state
+      // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
+      reduceTime = makeUpReduceRuntime(state);
+      TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
+          shuffleTime, sortTime, reduceTime);
+      return tai;
+    }
+
+    throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+        + taskType);
+  }
+
+  private long makeUpReduceRuntime(State state) {
+    long reduceTime = 0;
+    for (int i = 0; i < 5; i++) {
+      reduceTime = doMakeUpReduceRuntime(state);
+      if (reduceTime >= 0) {
+        return reduceTime;
+      }
+    }
+    return 0;
+  }
+
+  private long doMakeUpReduceRuntime(State state) {
+    long reduceTime;
+    try {
+      if (state == State.SUCCEEDED) {
+        reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
+      } else if (state == State.FAILED) {
+        reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
+      } else {
+        throw new IllegalArgumentException(
+            "state is neither SUCCEEDED nor FAILED: " + state);
+      }
+      return reduceTime;
+    } catch (NoValueToMakeUpRuntime e) {
+      return 0;
+    }
+  }
+
+  private long makeUpMapRuntime(State state, int locality) {
+    long runtime;
+    // make up runtime
+    if (state == State.SUCCEEDED || state == State.FAILED) {
+      List<LoggedDiscreteCDF> cdfList =
+          state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
+              .getFailedMapAttemptCDFs();
+      // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+      // the last group is "distance cannot be determined". All pig jobs
+      // would have only the 4th group, and pig tasks usually do not have
+      // any locality, so this group should count as "distance=2".
+      // However, setup/cleanup tasks are also counted in the 4th group.
+      // These tasks do not make sense.
+      try {
+        runtime = makeUpRuntime(cdfList.get(locality));
+      } catch (NoValueToMakeUpRuntime e) {
+        runtime = makeUpRuntime(cdfList);
+      }
+    } else {
+      throw new IllegalArgumentException(
+          "state is neither SUCCEEDED nor FAILED: " + state);
+    }
+    return runtime;
+  }
+
+  /**
+   * Perform a weighted random selection on a list of CDFs, and produce a random
+   * variable using the selected CDF.
+   * 
+   * @param mapAttemptCDFs
+   *          A list of CDFs for the distribution of runtime for the 1st, 2nd,
+   *          ... map attempts for the job.
+   */
+  private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
+    int total = 0;
+    for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+      total += cdf.getNumberValues();
+    }
+    if (total == 0) {
+      return -1;
+    }
+    int index = random.nextInt(total);
+    for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+      if (index >= cdf.getNumberValues()) {
+        index -= cdf.getNumberValues();
+      } else {
+        if (index < 0) {
+          throw new IllegalStateException("application error");
+        }
+        return makeUpRuntime(cdf);
+      }
+    }
+    throw new IllegalStateException("not possible to get here");
+  }
+
+  private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
+    /*
+     * We need this odd-looking code because if a seed exists we need to ensure
+     * that only one interpolator is generated per LoggedDiscreteCDF, but if no
+     * seed exists then the potentially lengthy process of making an
+     * interpolator can happen outside the lock. makeUpRuntimeCore only locks
+     * around the two hash map accesses.
+     */
+    if (hasRandomSeed) {
+      synchronized (interpolatorMap) {
+        return makeUpRuntimeCore(loggedDiscreteCDF);
+      }
+    }
+
+    return makeUpRuntimeCore(loggedDiscreteCDF);
+  }
+
+  private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
+    CDFRandomGenerator interpolator;
+
+    synchronized (interpolatorMap) {
+      interpolator = interpolatorMap.get(loggedDiscreteCDF);
+    }
+
+    if (interpolator == null) {
+      if (loggedDiscreteCDF.getNumberValues() == 0) {
+        throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
+      }
+
+      interpolator =
+          hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
+              loggedDiscreteCDF, ++seed)
+              : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
+
+      /*
+       * It doesn't matter if we compute and store an interpolator twice because
+       * the two instances will be semantically identical and stateless, unless
+       * we're seeded, in which case we're not stateless but this code will be
+       * called synchronizedly.
+       */
+      synchronized (interpolatorMap) {
+        interpolatorMap.put(loggedDiscreteCDF, interpolator);
+      }
+    }
+
+    return interpolator.randomValue();
+  }
+
+  static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
+    static final long serialVersionUID = 1L;
+
+    NoValueToMakeUpRuntime() {
+      super();
+    }
+
+    NoValueToMakeUpRuntime(String detailMessage) {
+      super(detailMessage);
+    }
+
+    NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
+      super(detailMessage, cause);
+    }
+
+    NoValueToMakeUpRuntime(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
+    if (taskAttemptNumber >= numAttempts.length - 1) {
+      // always succeed
+      return State.SUCCEEDED;
+    } else {
+      double pSucceed = numAttempts[taskAttemptNumber];
+      double pFail = 0;
+      for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
+        pFail += numAttempts[i];
+      }
+      return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
+          : State.FAILED;
+    }
+  }
+
+  private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
+    return new TaskID(new JobID(), TaskType.MAP == taskType, taskNumber);
+  }
+
+  private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
+    buildMaps();
+    return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
+  }
+
+  private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
+      int taskNumber, int taskAttemptNumber) {
+    buildMaps();
+    TaskAttemptID id =
+        new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
+            taskAttemptNumber);
+    return loggedTaskAttemptMap.get(id);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Producing {@link JobStory}s from job trace.
+ */
+public class ZombieJobProducer implements JobStoryProducer {
+  private final JobTraceReader reader;
+  private final ZombieCluster cluster;
+
+  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster) {
+    this.reader = reader;
+    this.cluster = cluster;
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param path
+   *          Path to the JSON trace file, possibly compressed.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param conf
+   * @throws IOException
+   */
+  public ZombieJobProducer(Path path, ZombieCluster cluster, Configuration conf)
+      throws IOException {
+    this(new JobTraceReader(path, conf), cluster);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          The input stream for the JSON trace.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @throws IOException
+   */
+  public ZombieJobProducer(InputStream input, ZombieCluster cluster)
+      throws IOException {
+    this(new JobTraceReader(input), cluster);
+  }
+
+  @Override
+  public ZombieJob getNextJob() throws IOException {
+    LoggedJob job = reader.getNext();
+    return (job == null) ? null : new ZombieJob(job, cluster);
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}