You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC
svn commit: r1077079 [11/11] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/
src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/
src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/
src/contrib/g...
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * {@link Node} represents a node in the cluster topology. A node can be a
+ * {@MachineNode}, or a {@link RackNode}, etc.
+ */
+public class Node implements Comparable<Node> {
+ private static final SortedSet<Node> EMPTY_SET =
+ Collections.unmodifiableSortedSet(new TreeSet<Node>());
+ private Node parent;
+ private final String name;
+ private final int level;
+ private SortedSet<Node> children;
+
+ /**
+ * @param name
+ * A unique name to identify a node in the cluster.
+ * @param level
+ * The level of the node in the cluster
+ */
+ public Node(String name, int level) {
+ if (name == null) {
+ throw new IllegalArgumentException("Node name cannot be null");
+ }
+
+ if (level < 0) {
+ throw new IllegalArgumentException("Level cannot be negative");
+ }
+
+ this.name = name;
+ this.level = level;
+ }
+
+ /**
+ * Get the name of the node.
+ *
+ * @return The name of the node.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the level of the node.
+ * @return The level of the node.
+ */
+ public int getLevel() {
+ return level;
+ }
+
+ private void checkChildren() {
+ if (children == null) {
+ children = new TreeSet<Node>();
+ }
+ }
+
+ /**
+ * Add a child node to this node.
+ * @param child The child node to be added. The child node should currently not be belong to another cluster topology.
+ * @return Boolean indicating whether the node is successfully added.
+ */
+ public synchronized boolean addChild(Node child) {
+ if (child.parent != null) {
+ throw new IllegalArgumentException(
+ "The child is already under another node:" + child.parent);
+ }
+ checkChildren();
+ boolean retval = children.add(child);
+ if (retval) child.parent = this;
+ return retval;
+ }
+
+ /**
+ * Does this node have any children?
+ * @return Boolean indicate whether this node has any children.
+ */
+ public synchronized boolean hasChildren() {
+ return children != null && !children.isEmpty();
+ }
+
+ /**
+ * Get the children of this node.
+ *
+ * @return The children of this node. If no child, an empty set will be
+ * returned. The returned set is read-only.
+ */
+ public synchronized Set<Node> getChildren() {
+ return (children == null) ? EMPTY_SET :
+ Collections.unmodifiableSortedSet(children);
+ }
+
+ /**
+ * Get the parent node.
+ * @return the parent node. If root node, return null.
+ */
+ public Node getParent() {
+ return parent;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (obj.getClass() != this.getClass())
+ return false;
+ Node other = (Node) obj;
+ return name.equals(other.name);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + name +", " + level +")";
+ }
+
+ @Override
+ public int compareTo(Node o) {
+ return name.compareTo(o.name);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+class Pair<CarType, CdrType> {
+ private final CarType car;
+ private final CdrType cdr;
+
+ Pair(CarType car, CdrType cdr) {
+ super();
+
+ this.car = car;
+ this.cdr = cdr;
+ }
+
+ CarType first() {
+ return car;
+ }
+
+ CdrType second() {
+ return cdr;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Element;
+import org.w3c.dom.Text;
+
+import org.xml.sax.SAXException;
+
+class ParsedConfigFile {
+ private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+ private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+
+ final int heapMegabytes;
+
+ final String queue;
+ final String jobName;
+
+ final int clusterMapMB;
+ final int clusterReduceMB;
+ final int jobMapMB;
+ final int jobReduceMB;
+
+ final String jobID;
+
+ final boolean valid;
+
+ private int maybeGetIntValue(String propName, String attr, String value,
+ int oldValue) {
+ if (propName.equals(attr) && value != null) {
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ return oldValue;
+ }
+ }
+
+ return oldValue;
+ }
+
+ @SuppressWarnings("hiding")
+ ParsedConfigFile(String filenameLine, String xmlString) {
+ super();
+
+ int heapMegabytes = -1;
+
+ String queue = null;
+ String jobName = null;
+
+ int clusterMapMB = -1;
+ int clusterReduceMB = -1;
+ int jobMapMB = -1;
+ int jobReduceMB = -1;
+
+ String jobID = null;
+
+ boolean valid = true;
+
+ Matcher jobIDMatcher = jobIDPattern.matcher(filenameLine);
+
+ if (jobIDMatcher.find()) {
+ jobID = jobIDMatcher.group(1);
+ }
+
+ try {
+ InputStream is = new ByteArrayInputStream(xmlString.getBytes());
+
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+ DocumentBuilder db = dbf.newDocumentBuilder();
+
+ Document doc = db.parse(is);
+
+ Element root = doc.getDocumentElement();
+
+ if (!"configuration".equals(root.getTagName())) {
+ System.out.print("root is not a configuration node");
+ valid = false;
+ }
+
+ NodeList props = root.getChildNodes();
+
+ for (int i = 0; i < props.getLength(); ++i) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element))
+ continue;
+ Element prop = (Element) propNode;
+ if (!"property".equals(prop.getTagName())) {
+ System.out.print("bad conf file: element not <property>");
+ }
+ NodeList fields = prop.getChildNodes();
+ String attr = null;
+ String value = null;
+ @SuppressWarnings("unused")
+ boolean finalParameter = false;
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element)) {
+ continue;
+ }
+
+ Element field = (Element) fieldNode;
+ if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+ attr = ((Text) field.getFirstChild()).getData().trim();
+ }
+ if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+ value = ((Text) field.getFirstChild()).getData();
+ }
+ if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+ finalParameter = "true".equals(((Text) field.getFirstChild())
+ .getData());
+ }
+ }
+ if ("mapred.child.java.opts".equals(attr) && value != null) {
+ Matcher matcher = heapPattern.matcher(value);
+ if (matcher.find()) {
+ String heapSize = matcher.group(1);
+
+ heapMegabytes = Integer.parseInt(heapSize);
+
+ if (matcher.group(2).equalsIgnoreCase("G")) {
+ heapMegabytes *= 1024;
+ }
+ }
+ }
+
+ if ("mapred.job.queue.name".equals(attr) && value != null) {
+ queue = value;
+ }
+
+ if ("mapred.job.name".equals(attr) && value != null) {
+ jobName = value;
+ }
+
+ clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
+ value, clusterMapMB);
+ clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
+ attr, value, clusterReduceMB);
+ jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
+ jobMapMB);
+ jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
+ value, jobReduceMB);
+ }
+
+ valid = true;
+ } catch (ParserConfigurationException e) {
+ valid = false;
+ } catch (SAXException e) {
+ valid = false;
+ } catch (IOException e) {
+ valid = false;
+ }
+
+ this.heapMegabytes = heapMegabytes;
+
+ this.queue = queue;
+ this.jobName = jobName;
+
+ this.clusterMapMB = clusterMapMB;
+ this.clusterReduceMB = clusterReduceMB;
+ this.jobMapMB = jobMapMB;
+ this.jobReduceMB = jobReduceMB;
+
+ this.jobID = jobID;
+
+ this.valid = valid;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+class ParsedHost {
+ private final String rackName;
+ private final String nodeName;
+
+ /**
+ * TODO the following only works for /rack/host format. Change to support
+ * arbitrary level of network names.
+ */
+ private static final Pattern splitPattern = Pattern
+ .compile("/([^/]+)/([^/]+)");
+
+ /**
+ * TODO handle arbitrary level of network names.
+ */
+ static int numberOfDistances() {
+ return 3;
+ }
+
+ String nameComponent(int i) throws IllegalArgumentException {
+ switch (i) {
+ case 0:
+ return rackName;
+
+ case 1:
+ return nodeName;
+
+ default:
+ throw new IllegalArgumentException(
+ "Host location component index out of range.");
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return rackName.hashCode() * 17 + nodeName.hashCode();
+ }
+
+ public static ParsedHost parse(String name) {
+ // separate out the node name
+ Matcher matcher = splitPattern.matcher(name);
+
+ if (!matcher.matches())
+ return null;
+
+ return new ParsedHost(matcher.group(1), matcher.group(2));
+ }
+
+ public ParsedHost(LoggedLocation loc) {
+ List<String> coordinates = loc.getLayers();
+
+ rackName = coordinates.get(0);
+ nodeName = coordinates.get(1);
+ }
+
+ LoggedLocation makeLoggedLocation() {
+ LoggedLocation result = new LoggedLocation();
+
+ List<String> coordinates = new ArrayList<String>();
+
+ coordinates.add(rackName);
+ coordinates.add(nodeName);
+
+ result.setLayers(coordinates);
+
+ return result;
+ }
+
+ String getNodeName() {
+ return nodeName;
+ }
+
+ String getRackName() {
+ return rackName;
+ }
+
+ // expects the broadest name first
+ ParsedHost(String rackName, String nodeName) {
+ this.rackName = rackName;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ParsedHost)) {
+ return false;
+ }
+ ParsedHost host = (ParsedHost) other;
+ return (nodeName.equals(host.nodeName) && rackName.equals(host.rackName));
+ }
+
+ int distance(ParsedHost other) {
+ if (nodeName.equals(other.nodeName)) {
+ return 0;
+ }
+
+ if (rackName.equals(other.rackName)) {
+ return 1;
+ }
+
+ return 2;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+class ParsedLine {
+ Properties content;
+ LogRecordType type;
+
+ static final Pattern keyValPair = Pattern
+ .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+
+ @SuppressWarnings("unused")
+ ParsedLine(String fullLine, int version) {
+ super();
+
+ content = new Properties();
+
+ int firstSpace = fullLine.indexOf(" ");
+
+ if (firstSpace < 0) {
+ firstSpace = fullLine.length();
+ }
+
+ if (firstSpace == 0) {
+ return; // This is a junk line of some sort
+ }
+
+ type = LogRecordType.intern(fullLine.substring(0, firstSpace));
+
+ String propValPairs = fullLine.substring(firstSpace + 1);
+
+ while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
+ propValPairs = propValPairs.substring(1);
+ }
+
+ int cursor = 0;
+
+ while (cursor < propValPairs.length()) {
+ int equals = propValPairs.indexOf('=', cursor);
+
+ if (equals < 0) {
+ // maybe we do some error processing
+ return;
+ }
+
+ int nextCursor;
+
+ int endValue;
+
+ if (propValPairs.charAt(equals + 1) == '\"') {
+ int closeQuote = propValPairs.indexOf('\"', equals + 2);
+
+ nextCursor = closeQuote + 1;
+
+ endValue = closeQuote;
+
+ if (closeQuote < 0) {
+ endValue = propValPairs.length();
+
+ nextCursor = endValue;
+ }
+ } else {
+ int closeSpace = propValPairs.indexOf(' ', equals + 1);
+
+ if (closeSpace < 0) {
+ closeSpace = propValPairs.length();
+ }
+
+ endValue = closeSpace;
+
+ nextCursor = endValue;
+ }
+
+ content.setProperty(propValPairs.substring(cursor, equals), propValPairs
+ .substring(equals + 2, endValue));
+
+ cursor = nextCursor;
+
+ while (cursor < propValPairs.length()
+ && propValPairs.charAt(cursor) == ' ') {
+ ++cursor;
+ }
+ }
+ }
+
+ protected LogRecordType getType() {
+ return type;
+ }
+
+ protected String get(String key) {
+ return content.getProperty(key);
+ }
+
+ protected long getLong(String key) {
+ String val = get(key);
+
+ return Long.parseLong(val);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ *
+ *
+ */
+public class Pre21JobHistoryConstants {
+
+ /**
+ * Job history files contain key="value" pairs, where keys belong to this enum.
+ * It acts as a global namespace for all keys.
+ */
+ static enum Keys {
+ JOBTRACKERID,
+ START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+ LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+ FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+ ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+ SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+ JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
+ }
+
+ /**
+ * This enum contains some of the values commonly used by history log events.
+ * since values in history can only be strings - Values.name() is used in
+ * most places in history file.
+ */
+ public static enum Values {
+ SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+
+/**
+ * {@link RackNode} represents a rack node in the cluster topology.
+ */
+public final class RackNode extends Node {
+ public RackNode(String name, int level) {
+ // Hack: ensuring rack name starts with "/".
+ super(name.startsWith("/") ? name : "/" + name, level);
+ }
+
+ @Override
+ public synchronized boolean addChild(Node child) {
+ if (!(child instanceof MachineNode)) {
+ throw new IllegalArgumentException(
+ "Only MachineNode can be added to RackNode");
+ }
+ return super.addChild(child);
+ }
+
+ /**
+ * Get the machine nodes that belong to the rack.
+ * @return The machine nodes that belong to the rack.
+ */
+ @SuppressWarnings({ "cast", "unchecked" })
+ public Set<MachineNode> getMachinesInRack() {
+ return (Set<MachineNode>)(Set)getChildren();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link ReduceTaskAttemptInfo} represents the information with regard to a
+ * reduce task attempt.
+ */
+public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
+ private long shuffleTime;
+ private long mergeTime;
+ private long reduceTime;
+
+ public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+ long mergeTime, long reduceTime) {
+ super(state, taskInfo);
+ this.shuffleTime = shuffleTime;
+ this.mergeTime = mergeTime;
+ this.reduceTime = reduceTime;
+ }
+
+ /**
+ * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
+ *
+ * @return the runtime for the <b>reduce</b> phase of the reduce task-attempt
+ */
+ public long getReduceRuntime() {
+ return reduceTime;
+ }
+
+ /**
+ * Get the runtime for the <b>shuffle</b> phase of the reduce task-attempt.
+ *
+ * @return the runtime for the <b>shuffle</b> phase of the reduce task-attempt
+ */
+ public long getShuffleRuntime() {
+ return shuffleTime;
+ }
+
+ /**
+ * Get the runtime for the <b>merge</b> phase of the reduce task-attempt
+ *
+ * @return the runtime for the <b>merge</b> phase of the reduce task-attempt
+ */
+ public long getMergeRuntime() {
+ return mergeTime;
+ }
+
+ @Override
+ public long getRuntime() {
+ return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link TaskAttemptInfo} is a collection of statistics about a particular
+ * task-attempt gleaned from job-history of the job.
+ */
+public abstract class TaskAttemptInfo {
+ protected final State state;
+ protected final TaskInfo taskInfo;
+
+ protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+ if (state == State.SUCCEEDED || state == State.FAILED) {
+ this.state = state;
+ } else {
+ throw new IllegalArgumentException("status cannot be " + state);
+ }
+ this.taskInfo = taskInfo;
+ }
+
+ /**
+ * Get the final {@link TaskStatus.State} of the task-attempt.
+ *
+ * @return the final <code>State</code> of the task-attempt
+ */
+ public State getRunState() {
+ return state;
+ }
+
+ /**
+ * Get the total runtime for the task-attempt.
+ *
+ * @return the total runtime for the task-attempt
+ */
+ public abstract long getRuntime();
+
+ /**
+ * Get the {@link TaskInfo} for the given task-attempt.
+ *
+ * @return the <code>TaskInfo</code> for the given task-attempt
+ */
+ public TaskInfo getTaskInfo() {
+ return taskInfo;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class TaskInfo {
+ private final long bytesIn;
+ private final int recsIn;
+ private final long bytesOut;
+ private final int recsOut;
+ private final long maxMemory;
+
+ public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+ long maxMemory) {
+ this.bytesIn = bytesIn;
+ this.recsIn = recsIn;
+ this.bytesOut = bytesOut;
+ this.recsOut = recsOut;
+ this.maxMemory = maxMemory;
+ }
+
+ /**
+ * @return Raw bytes read from the FileSystem into the task. Note that this
+ * may not always match the input bytes to the task.
+ */
+ public long getInputBytes() {
+ return bytesIn;
+ }
+
+ /**
+ * @return Number of records input to this task.
+ */
+ public int getInputRecords() {
+ return recsIn;
+ }
+
+ /**
+ * @return Raw bytes written to the destination FileSystem. Note that this may
+ * not match output bytes.
+ */
+ public long getOutputBytes() {
+ return bytesOut;
+ }
+
+ /**
+ * @return Number of records output from this task.
+ */
+ public int getOutputRecords() {
+ return recsOut;
+ }
+
+ /**
+ * @return Memory used by the task leq the heap size.
+ */
+ public long getTaskMemory() {
+ return maxMemory;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * This describes a path from a node to the root. We use it when we compare two
+ * trees during rumen unit tests. If the trees are not identical, this chain
+ * will be converted to a string which describes the path from the root to the
+ * fields that did not compare.
+ *
+ */
+public class TreePath {
+ final TreePath parent;
+
+ final String fieldName;
+
+ final int index;
+
+ public TreePath(TreePath parent, String fieldName) {
+ super();
+
+ this.parent = parent;
+ this.fieldName = fieldName;
+ this.index = -1;
+ }
+
+ public TreePath(TreePath parent, String fieldName, int index) {
+ super();
+
+ this.parent = parent;
+ this.fieldName = fieldName;
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ String mySegment = fieldName + (index == -1 ? "" : ("[" + index + "]"));
+
+ return ((parent == null) ? "" : parent.toString() + "-->") + mySegment;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link ZombieCluster} rebuilds the cluster topology using the information
+ * obtained from job history logs.
+ */
+public class ZombieCluster extends AbstractClusterStory {
+ private Node root;
+
+ /**
+ * Construct a homogeneous cluster. We assume that the leaves on the topology
+ * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+ * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+ *
+ * @param topology
+ * The network topology.
+ * @param defaultNode
+ * The default node setting.
+ */
+ ZombieCluster(LoggedNetworkTopology topology, MachineNode defaultNode) {
+ buildCluster(topology, defaultNode);
+ }
+
+ /**
+ * Construct a homogeneous cluster. We assume that the leaves on the topology
+ * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+ * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+ *
+ * @param path Path to the JSON-encoded topology file.
+ * @param conf
+ * @param defaultNode
+ * The default node setting.
+ * @throws IOException
+ */
+ public ZombieCluster(Path path, MachineNode defaultNode, Configuration conf) throws IOException {
+ this(new ClusterTopologyReader(path, conf).get(), defaultNode);
+ }
+
+ /**
+ * Construct a homogeneous cluster. We assume that the leaves on the topology
+ * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+ * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+ *
+ * @param input The input stream for the JSON-encoded topology file.
+ * @param defaultNode
+ * The default node setting.
+ * @throws IOException
+ */
+ public ZombieCluster(InputStream input, MachineNode defaultNode) throws IOException {
+ this(new ClusterTopologyReader(input).get(), defaultNode);
+ }
+
+ @Override
+ public Node getClusterTopology() {
+ return root;
+ }
+
+ private final void buildCluster(LoggedNetworkTopology topology,
+ MachineNode defaultNode) {
+ Map<LoggedNetworkTopology, Integer> levelMapping =
+ new IdentityHashMap<LoggedNetworkTopology, Integer>();
+ Deque<LoggedNetworkTopology> unvisited =
+ new ArrayDeque<LoggedNetworkTopology>();
+ unvisited.add(topology);
+ levelMapping.put(topology, 0);
+
+ // building levelMapping and determine leafLevel
+ int leafLevel = -1; // -1 means leafLevel unknown.
+ for (LoggedNetworkTopology n = unvisited.poll(); n != null;
+ n = unvisited.poll()) {
+ int level = levelMapping.get(n);
+ List<LoggedNetworkTopology> children = n.getChildren();
+ if (children == null || children.isEmpty()) {
+ if (leafLevel == -1) {
+ leafLevel = level;
+ } else if (leafLevel != level) {
+ throw new IllegalArgumentException(
+ "Leaf nodes are not on the same level");
+ }
+ } else {
+ for (LoggedNetworkTopology child : children) {
+ levelMapping.put(child, level + 1);
+ unvisited.addFirst(child);
+ }
+ }
+ }
+
+ /**
+ * A second-pass dfs traverse of topology tree. path[i] contains the parent
+ * of the node at level i+1.
+ */
+ Node[] path = new Node[leafLevel];
+ unvisited.add(topology);
+ for (LoggedNetworkTopology n = unvisited.poll(); n != null;
+ n = unvisited.poll()) {
+ int level = levelMapping.get(n);
+ Node current;
+ if (level == leafLevel) { // a machine node
+ MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
+ if (defaultNode != null) {
+ builder.cloneFrom(defaultNode);
+ }
+ current = builder.build();
+ } else {
+ current = (level == leafLevel - 1)
+ ? new RackNode(n.getName(), level) :
+ new Node(n.getName(), level);
+ path[level] = current;
+ // Add all children to the front of the queue.
+ for (LoggedNetworkTopology child : n.getChildren()) {
+ unvisited.addFirst(child);
+ }
+ }
+ if (level != 0) {
+ path[level - 1].addChild(current);
+ }
+ }
+
+ root = path[0];
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,880 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+
+/**
+ * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
+ *
+ * Each {@link ZombieJob} object represents a job in job history. For everything
+ * that exists in job history, contents are returned unchanged faithfully. To
+ * get input splits of a non-exist task, a non-exist task attempt, or an
+ * ill-formed task attempt, proper objects are made up from statistical
+ * sketches.
+ */
+@SuppressWarnings("deprecation")
+public class ZombieJob implements JobStory {
+ static final Log LOG = LogFactory.getLog(ZombieJob.class);
+ private final LoggedJob job;
+ private Map<TaskID, LoggedTask> loggedTaskMap;
+ private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap;
+ private final Random random;
+ private InputSplit[] splits;
+ private final ClusterStory cluster;
+ private JobConf jobConf;
+
+ private long seed;
+ private boolean hasRandomSeed = false;
+
+ private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
+ new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();
+
+ // TODO: Fix ZombieJob to initialize this correctly from observed data
+ double rackLocalOverNodeLocal = 1.5;
+ double rackRemoteOverNodeLocal = 3.0;
+
+ /**
+ * This constructor creates a {@link ZombieJob} with the same semantics as the
+ * {@link LoggedJob} passed in this parameter
+ *
+ * @param job
+ * The dead job this ZombieJob instance is based on.
+ * @param cluster
+ * The cluster topology where the dead job ran on. This argument can
+ * be null if we do not have knowledge of the cluster topology.
+ * @param seed
+ * Seed for the random number generator for filling in information
+ * not available from the ZombieJob.
+ */
+ public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) {
+ if (job == null) {
+ throw new IllegalArgumentException("job is null");
+ }
+ this.job = job;
+ this.cluster = cluster;
+ random = new Random(seed);
+ this.seed = seed;
+ hasRandomSeed = true;
+ }
+
+ /**
+ * This constructor creates a {@link ZombieJob} with the same semantics as the
+ * {@link LoggedJob} passed in this parameter
+ *
+ * @param job
+ * The dead job this ZombieJob instance is based on.
+ * @param cluster
+ * The cluster topology where the dead job ran on. This argument can
+ * be null if we do not have knowledge of the cluster topology.
+ */
+ public ZombieJob(LoggedJob job, ClusterStory cluster) {
+ this(job, cluster, System.nanoTime());
+ }
+
+ private static State convertState(Values status) {
+ if (status == Values.SUCCESS) {
+ return State.SUCCEEDED;
+ } else if (status == Values.FAILED) {
+ return State.FAILED;
+ } else if (status == Values.KILLED) {
+ return State.KILLED;
+ } else {
+ throw new IllegalArgumentException("unknown status " + status);
+ }
+ }
+
+ @Override
+ public synchronized JobConf getJobConf() {
+ if (jobConf == null) {
+ // TODO : add more to jobConf ?
+ jobConf = new JobConf();
+ jobConf.setJobName(getName());
+ jobConf.setUser(getUser());
+ jobConf.setNumMapTasks(getNumberMaps());
+ jobConf.setNumReduceTasks(getNumberReduces());
+ }
+ return jobConf;
+ }
+
+ @Override
+ public InputSplit[] getInputSplits() {
+ if (splits == null) {
+ List<InputSplit> splitsList = new ArrayList<InputSplit>();
+ Path emptyPath = new Path("/");
+ int totalHosts = 0; // use to determine avg # of hosts per split.
+ for (LoggedTask mapTask : job.getMapTasks()) {
+ Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType();
+ if (taskType != Pre21JobHistoryConstants.Values.MAP) {
+ LOG.warn("TaskType for a MapTask is not Map. task="
+ + mapTask.getTaskID() + " type="
+ + ((taskType == null) ? "null" : taskType.toString()));
+ continue;
+ }
+ List<LoggedLocation> locations = mapTask.getPreferredLocations();
+ List<String> hostList = new ArrayList<String>();
+ if (locations != null) {
+ for (LoggedLocation location : locations) {
+ List<String> layers = location.getLayers();
+ if (layers.size() == 0) {
+ LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
+ continue;
+ }
+ String host = layers.get(layers.size() - 1);
+ if (host == null) {
+ LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
+ continue;
+ }
+ hostList.add(host);
+ }
+ }
+ String[] hosts = hostList.toArray(new String[hostList.size()]);
+ totalHosts += hosts.length;
+ long mapInputBytes = getTaskInfo(mapTask).getInputBytes();
+ if (mapInputBytes < 0) {
+ LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined.");
+ mapInputBytes = 0;
+ }
+
+ splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts));
+ }
+
+ // If not all map tasks are in job trace, should make up some splits
+ // for missing map tasks.
+ int totalMaps = job.getTotalMaps();
+ if (totalMaps < splitsList.size()) {
+ LOG.warn("TotalMaps for job " + job.getJobID()
+ + " is less than the total number of map task descriptions ("
+ + totalMaps + "<" + splitsList.size() + ").");
+ }
+
+ int avgHostPerSplit;
+ if (splitsList.size() == 0) {
+ avgHostPerSplit = 3;
+ } else {
+ avgHostPerSplit = totalHosts / splitsList.size();
+ if (avgHostPerSplit == 0) {
+ avgHostPerSplit = 3;
+ }
+ }
+
+ for (int i = splitsList.size(); i < totalMaps; i++) {
+ if (cluster == null) {
+ splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
+ } else {
+ MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
+ String[] hosts = new String[mNodes.length];
+ for (int j = 0; j < hosts.length; ++j) {
+ hosts[j] = mNodes[j].getName();
+ }
+ // TODO set size of a split to 0 now.
+ splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
+ }
+ }
+
+ splits = splitsList.toArray(new InputSplit[splitsList.size()]);
+ }
+ return splits;
+ }
+
+ @Override
+ public String getName() {
+ String jobName = job.getJobName();
+ if (jobName == null) {
+ return "(name unknown)";
+ } else {
+ return jobName;
+ }
+ }
+
+ @Override
+ public JobID getJobID() {
+ return JobID.forName(getLoggedJob().getJobID());
+ }
+
+ private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
+ if (oldVal == -1) {
+ LOG.warn(name +" not defined for "+id);
+ return defaultVal;
+ }
+ return oldVal;
+ }
+
+ @Override
+ public int getNumberMaps() {
+ return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID());
+ }
+
+ @Override
+ public int getNumberReduces() {
+ return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID());
+ }
+
+ @Override
+ public Values getOutcome() {
+ return job.getOutcome();
+ }
+
+ @Override
+ public long getSubmissionTime() {
+ return job.getSubmitTime() - job.getRelativeTime();
+ }
+
+ /**
+ * Getting the number of map tasks that are actually logged in the trace.
+ * @return The number of map tasks that are actually logged in the trace.
+ */
+ public int getNumLoggedMaps() {
+ return job.getMapTasks().size();
+ }
+
+
+ /**
+ * Getting the number of reduce tasks that are actually logged in the trace.
+ * @return The number of map tasks that are actually logged in the trace.
+ */
+ public int getNumLoggedReduces() {
+ return job.getReduceTasks().size();
+ }
+
+ /**
+ * Mask the job ID part in a {@link TaskID}.
+ *
+ * @param taskId
+ * raw {@link TaskID} read from trace
+ * @return masked {@link TaskID} with empty {@link JobID}.
+ */
+ private TaskID maskTaskID(TaskID taskId) {
+ JobID jobId = new JobID();
+ return new TaskID(jobId, taskId.isMap(), taskId.getId());
+ }
+
+ /**
+ * Mask the job ID part in a {@link TaskAttemptID}.
+ *
+ * @param attemptId
+ * raw {@link TaskAttemptID} read from trace
+ * @return masked {@link TaskAttemptID} with empty {@link JobID}.
+ */
+ private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
+ JobID jobId = new JobID();
+ TaskID taskId = attemptId.getTaskID();
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(),
+ attemptId.isMap(), taskId.getId(), attemptId.getId());
+ }
+
+ private LoggedTask sanitizeLoggedTask(LoggedTask task) {
+ if (task == null) {
+ return null;
+ }
+ if (task.getTaskType() == null) {
+ LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
+ return null;
+ }
+ if (task.getTaskStatus() == null) {
+ LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus");
+ return null;
+ }
+ return task;
+ }
+
+ private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
+ if (attempt == null) {
+ return null;
+ }
+ if (attempt.getResult() == null) {
+ LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
+ return null;
+ }
+
+ return attempt;
+ }
+
+ /**
+ * Build task mapping and task attempt mapping, to be later used to find
+ * information of a particular {@link TaskID} or {@link TaskAttemptID}.
+ */
+ private synchronized void buildMaps() {
+ if (loggedTaskMap == null) {
+ loggedTaskMap = new HashMap<TaskID, LoggedTask>();
+ loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
+
+ for (LoggedTask map : job.getMapTasks()) {
+ map = sanitizeLoggedTask(map);
+ if (map != null) {
+ loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+
+ for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
+ mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
+ if (mapAttempt != null) {
+ TaskAttemptID id = TaskAttemptID.forName(mapAttempt
+ .getAttemptID());
+ loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
+ }
+ }
+ }
+ }
+ for (LoggedTask reduce : job.getReduceTasks()) {
+ reduce = sanitizeLoggedTask(reduce);
+ if (reduce != null) {
+ loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+
+ for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
+ reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
+ if (reduceAttempt != null) {
+ TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
+ .getAttemptID());
+ loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
+ }
+ }
+ }
+ }
+
+ // TODO: do not care about "other" tasks, "setup" or "clean"
+ }
+ }
+
+ @Override
+ public String getUser() {
+ String retval = job.getUser();
+ return (retval==null)?"(unknown)":retval;
+ }
+
+ /**
+ * Get the underlining {@link LoggedJob} object read directly from the trace.
+ * This is mainly for debugging.
+ *
+ * @return the underlining {@link LoggedJob} object
+ */
+ public LoggedJob getLoggedJob() {
+ return job;
+ }
+
+ /**
+ * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+ * taskType, taskNumber, and taskAttemptNumber. This function does not care
+ * about locality, and follows the following decision logic: 1. Make up a
+ * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+ * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+ * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the
+ * {@link TaskAttemptInfo} from the trace.
+ */
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+ int taskAttemptNumber) {
+ // does not care about locality. assume default locality is NODE_LOCAL.
+ // But if both task and task attempt exist in trace, use logged locality.
+ int locality = 0;
+ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+ if (loggedTask == null) {
+ // TODO insert parameters
+ TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ }
+
+ LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+ taskNumber, taskAttemptNumber);
+ if (loggedAttempt == null) {
+ // Task exists, but attempt is missing.
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ } else {
+ // TODO should we handle killed attempts later?
+ if (loggedAttempt.getResult()== Values.KILLED) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ } else {
+ return getTaskAttemptInfo(loggedTask, loggedAttempt);
+ }
+ }
+ }
+
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return getTaskInfo(getLoggedTask(taskType, taskNumber));
+ }
+
+ /**
+ * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+ * taskType, taskNumber, and taskAttemptNumber. This function considers
+ * locality, and follows the following decision logic: 1. Make up a
+ * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+ * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+ * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo}
+ * from the trace, without considering locality. 4. If final state is
+ * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime
+ * scaled according to locality in simulation and locality in trace.
+ */
+ @Override
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+ int taskAttemptNumber, int locality) {
+ TaskType taskType = TaskType.MAP;
+ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+ if (loggedTask == null) {
+ // TODO insert parameters
+ TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ }
+ LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+ taskNumber, taskAttemptNumber);
+ if (loggedAttempt == null) {
+ // Task exists, but attempt is missing.
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ } else {
+ // Task and TaskAttempt both exist.
+ if (loggedAttempt.getResult() == Values.KILLED) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+ taskNumber, locality);
+ } else if (loggedAttempt.getResult() == Values.FAILED) {
+ /**
+ * FAILED attempt is not affected by locality however, made-up FAILED
+ * attempts ARE affected by locality, since statistics are present for
+ * attempts of different locality.
+ */
+ return getTaskAttemptInfo(loggedTask, loggedAttempt);
+ } else if (loggedAttempt.getResult() == Values.SUCCESS) {
+ int loggedLocality = getLocality(loggedTask, loggedAttempt);
+ if (locality == loggedLocality) {
+ return getTaskAttemptInfo(loggedTask, loggedAttempt);
+ } else {
+ // attempt succeeded in trace. It is scheduled in simulation with
+ // a different locality.
+ return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality,
+ rackLocalOverNodeLocal, rackRemoteOverNodeLocal);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "attempt result is not SUCCEEDED, FAILED or KILLED: "
+ + loggedAttempt.getResult());
+ }
+ }
+ }
+
+ private long sanitizeTaskRuntime(long time, String id) {
+ if (time < 0) {
+ LOG.warn("Negative running time for task "+id+": "+time);
+ return 100L; // set default to 100ms.
+ }
+ return time;
+ }
+
+ @SuppressWarnings("hiding")
+ private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
+ LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
+ double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ double[] factors = new double[] { 1.0, rackLocalOverNodeLocal,
+ rackRemoteOverNodeLocal };
+ double scaleFactor = factors[locality] / factors[loggedLocality];
+ State state = convertState(loggedAttempt.getResult());
+ if (loggedTask.getTaskType() == Values.MAP) {
+ long taskTime = 0;
+ if (loggedAttempt.getStartTime() == 0) {
+ taskTime = makeUpMapRuntime(state, locality);
+ } else {
+ taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+ }
+ taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
+ taskTime *= scaleFactor;
+ return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+ } else {
+ throw new IllegalArgumentException("taskType can only be MAP: "
+ + loggedTask.getTaskType());
+ }
+ }
+
+ private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
+ int distance = cluster.getMaximumDistance();
+ String rackHostName = loggedAttempt.getHostName();
+ if (rackHostName == null) {
+ return distance;
+ }
+ MachineNode mn = getMachineNode(rackHostName);
+ if (mn == null) {
+ return distance;
+ }
+ List<LoggedLocation> locations = loggedTask.getPreferredLocations();
+ if (locations != null) {
+ for (LoggedLocation location : locations) {
+ List<String> layers = location.getLayers();
+ if ((layers == null) || (layers.isEmpty())) {
+ continue;
+ }
+ String dataNodeName = layers.get(layers.size()-1);
+ MachineNode dataNode = cluster.getMachineByName(dataNodeName);
+ if (dataNode != null) {
+ distance = Math.min(distance, cluster.distance(mn, dataNode));
+ }
+ }
+ }
+ return distance;
+ }
+
+ private MachineNode getMachineNode(String rackHostName) {
+ ParsedHost parsedHost = ParsedHost.parse(rackHostName);
+ String hostName = (parsedHost == null) ? rackHostName
+ : parsedHost.getNodeName();
+ if (hostName == null) {
+ return null;
+ }
+ return (cluster == null) ? null : cluster.getMachineByName(hostName);
+ }
+
+ private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
+ LoggedTaskAttempt loggedAttempt) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ State state = convertState(loggedAttempt.getResult());
+ if (loggedTask.getTaskType() == Values.MAP) {
+ long taskTime;
+ if (loggedAttempt.getStartTime() == 0) {
+ int locality = getLocality(loggedTask, loggedAttempt);
+ taskTime = makeUpMapRuntime(state, locality);
+ } else {
+ taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+ }
+ taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
+ return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+ } else if (loggedTask.getTaskType() == Values.REDUCE) {
+ long startTime = loggedAttempt.getStartTime();
+ long mergeDone = loggedAttempt.getSortFinished();
+ long shuffleDone = loggedAttempt.getShuffleFinished();
+ long finishTime = loggedAttempt.getFinishTime();
+ if (startTime <= 0 || startTime >= finishTime) {
+ // have seen startTime>finishTime.
+ // haven't seen reduce task with startTime=0 ever. But if this happens,
+ // make up a reduceTime with no shuffle/merge.
+ long reduceTime = makeUpReduceRuntime(state);
+ return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+ } else {
+ if (shuffleDone <= 0) {
+ shuffleDone = startTime;
+ }
+ if (mergeDone <= 0) {
+ mergeDone = finishTime;
+ }
+ long shuffleTime = shuffleDone - startTime;
+ long mergeTime = mergeDone - shuffleDone;
+ long reduceTime = finishTime - mergeDone;
+ reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
+
+ return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
+ mergeTime, reduceTime);
+ }
+ } else {
+ throw new IllegalArgumentException("taskType for "
+ + loggedTask.getTaskID() + " is neither MAP nor REDUCE: "
+ + loggedTask.getTaskType());
+ }
+ }
+
+ private TaskInfo getTaskInfo(LoggedTask loggedTask) {
+ List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
+
+ long inputBytes = -1;
+ long inputRecords = -1;
+ long outputBytes = -1;
+ long outputRecords = -1;
+ long heapMegabytes = -1;
+
+ Values type = loggedTask.getTaskType();
+ if ((type != Values.MAP) && (type != Values.REDUCE)) {
+ throw new IllegalArgumentException(
+ "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
+ + " for task = " + loggedTask.getTaskID());
+ }
+
+ for (LoggedTaskAttempt attempt : attempts) {
+ attempt = sanitizeLoggedTaskAttempt(attempt);
+ // ignore bad attempts or unsuccessful attempts.
+ if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
+ continue;
+ }
+
+ if (type == Values.MAP) {
+ inputBytes = attempt.getHdfsBytesRead();
+ inputRecords = attempt.getMapInputRecords();
+ outputBytes =
+ (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
+ .getHdfsBytesWritten();
+ outputRecords = attempt.getMapOutputRecords();
+ heapMegabytes =
+ (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
+ .getHeapMegabytes();
+ } else {
+ inputBytes = attempt.getReduceShuffleBytes();
+ inputRecords = attempt.getReduceInputRecords();
+ outputBytes = attempt.getHdfsBytesWritten();
+ outputRecords = attempt.getReduceOutputRecords();
+ heapMegabytes =
+ (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
+ .getHeapMegabytes();
+ }
+ break;
+ }
+
+ TaskInfo taskInfo =
+ new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
+ (int) outputRecords, (int) heapMegabytes);
+ return taskInfo;
+ }
+
+ private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
+ int taskAttemptNumber) {
+ return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
+ TaskType.MAP == taskType, taskNumber), taskAttemptNumber);
+ }
+
+ private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
+ int taskAttemptNumber, int taskNumber, int locality) {
+ if (taskType == TaskType.MAP) {
+ State state = State.SUCCEEDED;
+ long runtime = 0;
+
+ // make up state
+ state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
+ runtime = makeUpMapRuntime(state, locality);
+ runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
+ taskNumber, taskAttemptNumber).toString());
+ TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+ return tai;
+ } else if (taskType == TaskType.REDUCE) {
+ State state = State.SUCCEEDED;
+ long shuffleTime = 0;
+ long sortTime = 0;
+ long reduceTime = 0;
+
+ // TODO make up state
+ // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
+ reduceTime = makeUpReduceRuntime(state);
+ TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
+ shuffleTime, sortTime, reduceTime);
+ return tai;
+ }
+
+ throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+ + taskType);
+ }
+
+ private long makeUpReduceRuntime(State state) {
+ long reduceTime = 0;
+ for (int i = 0; i < 5; i++) {
+ reduceTime = doMakeUpReduceRuntime(state);
+ if (reduceTime >= 0) {
+ return reduceTime;
+ }
+ }
+ return 0;
+ }
+
+ private long doMakeUpReduceRuntime(State state) {
+ long reduceTime;
+ try {
+ if (state == State.SUCCEEDED) {
+ reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
+ } else if (state == State.FAILED) {
+ reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
+ } else {
+ throw new IllegalArgumentException(
+ "state is neither SUCCEEDED nor FAILED: " + state);
+ }
+ return reduceTime;
+ } catch (NoValueToMakeUpRuntime e) {
+ return 0;
+ }
+ }
+
+ private long makeUpMapRuntime(State state, int locality) {
+ long runtime;
+ // make up runtime
+ if (state == State.SUCCEEDED || state == State.FAILED) {
+ List<LoggedDiscreteCDF> cdfList =
+ state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
+ .getFailedMapAttemptCDFs();
+ // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+ // the last group is "distance cannot be determined". All pig jobs
+ // would have only the 4th group, and pig tasks usually do not have
+ // any locality, so this group should count as "distance=2".
+ // However, setup/cleanup tasks are also counted in the 4th group.
+ // These tasks do not make sense.
+ try {
+ runtime = makeUpRuntime(cdfList.get(locality));
+ } catch (NoValueToMakeUpRuntime e) {
+ runtime = makeUpRuntime(cdfList);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "state is neither SUCCEEDED nor FAILED: " + state);
+ }
+ return runtime;
+ }
+
+ /**
+ * Perform a weighted random selection on a list of CDFs, and produce a random
+ * variable using the selected CDF.
+ *
+ * @param mapAttemptCDFs
+ * A list of CDFs for the distribution of runtime for the 1st, 2nd,
+ * ... map attempts for the job.
+ */
+ private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
+ int total = 0;
+ for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+ total += cdf.getNumberValues();
+ }
+ if (total == 0) {
+ return -1;
+ }
+ int index = random.nextInt(total);
+ for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+ if (index >= cdf.getNumberValues()) {
+ index -= cdf.getNumberValues();
+ } else {
+ if (index < 0) {
+ throw new IllegalStateException("application error");
+ }
+ return makeUpRuntime(cdf);
+ }
+ }
+ throw new IllegalStateException("not possible to get here");
+ }
+
+ private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
+ /*
+ * We need this odd-looking code because if a seed exists we need to ensure
+ * that only one interpolator is generated per LoggedDiscreteCDF, but if no
+ * seed exists then the potentially lengthy process of making an
+ * interpolator can happen outside the lock. makeUpRuntimeCore only locks
+ * around the two hash map accesses.
+ */
+ if (hasRandomSeed) {
+ synchronized (interpolatorMap) {
+ return makeUpRuntimeCore(loggedDiscreteCDF);
+ }
+ }
+
+ return makeUpRuntimeCore(loggedDiscreteCDF);
+ }
+
+ private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
+ CDFRandomGenerator interpolator;
+
+ synchronized (interpolatorMap) {
+ interpolator = interpolatorMap.get(loggedDiscreteCDF);
+ }
+
+ if (interpolator == null) {
+ if (loggedDiscreteCDF.getNumberValues() == 0) {
+ throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
+ }
+
+ interpolator =
+ hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
+ loggedDiscreteCDF, ++seed)
+ : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
+
+ /*
+ * It doesn't matter if we compute and store an interpolator twice because
+ * the two instances will be semantically identical and stateless, unless
+ * we're seeded, in which case we're not stateless but this code will be
+ * called synchronizedly.
+ */
+ synchronized (interpolatorMap) {
+ interpolatorMap.put(loggedDiscreteCDF, interpolator);
+ }
+ }
+
+ return interpolator.randomValue();
+ }
+
+ static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
+ static final long serialVersionUID = 1L;
+
+ NoValueToMakeUpRuntime() {
+ super();
+ }
+
+ NoValueToMakeUpRuntime(String detailMessage) {
+ super(detailMessage);
+ }
+
+ NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
+ super(detailMessage, cause);
+ }
+
+ NoValueToMakeUpRuntime(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
+ if (taskAttemptNumber >= numAttempts.length - 1) {
+ // always succeed
+ return State.SUCCEEDED;
+ } else {
+ double pSucceed = numAttempts[taskAttemptNumber];
+ double pFail = 0;
+ for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
+ pFail += numAttempts[i];
+ }
+ return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
+ : State.FAILED;
+ }
+ }
+
+ private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
+ return new TaskID(new JobID(), TaskType.MAP == taskType, taskNumber);
+ }
+
+ private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
+ buildMaps();
+ return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
+ }
+
+ private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
+ int taskNumber, int taskAttemptNumber) {
+ buildMaps();
+ TaskAttemptID id =
+ new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
+ taskAttemptNumber);
+ return loggedTaskAttemptMap.get(id);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Producing {@link JobStory}s from job trace.
+ */
+public class ZombieJobProducer implements JobStoryProducer {
+ private final JobTraceReader reader;
+ private final ZombieCluster cluster;
+
+ private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster) {
+ this.reader = reader;
+ this.cluster = cluster;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param path
+ * Path to the JSON trace file, possibly compressed.
+ * @param cluster
+ * The topology of the cluster that corresponds to the jobs in the
+ * trace. The argument can be null if we do not have knowledge of the
+ * cluster topology.
+ * @param conf
+ * @throws IOException
+ */
+ public ZombieJobProducer(Path path, ZombieCluster cluster, Configuration conf)
+ throws IOException {
+ this(new JobTraceReader(path, conf), cluster);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param input
+ * The input stream for the JSON trace.
+ * @param cluster
+ * The topology of the cluster that corresponds to the jobs in the
+ * trace. The argument can be null if we do not have knowledge of the
+ * cluster topology.
+ * @throws IOException
+ */
+ public ZombieJobProducer(InputStream input, ZombieCluster cluster)
+ throws IOException {
+ this(new JobTraceReader(input), cluster);
+ }
+
+ @Override
+ public ZombieJob getNextJob() throws IOException {
+ LoggedJob job = reader.getNext();
+ return (job == null) ? null : new ZombieJob(job, cluster);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}