You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/28 02:12:21 UTC
svn commit: r808686 [8/9] - in /hadoop/mapreduce/trunk: ./ ivy/
src/java/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/
src/test/tools/data/ src/test/tools/data/rumen/
src/test/tools/data/rumen/histogram-tests/...
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonAnySetter;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+/**
+ * A {@link LoggedDiscreteCDF} is a representation of an hadoop job, with the
+ * details of this class set up to meet the requirements of the Jackson JSON
+ * parser/generator.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedJob implements DeepCompare {
+ public enum JobType {
+ JAVA, PIG, STREAMING, PIPES, OVERALL
+ };
+
+ public enum JobPriority {
+ VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH
+ };
+
+ static private TreeSet<String> alreadySeenAnySetterAttributes = new TreeSet<String>();
+
+ String jobID;
+ String user;
+ long computonsPerMapInputByte = -1L;
+ long computonsPerMapOutputByte = -1L;
+ long computonsPerReduceInputByte = -1L;
+ long computonsPerReduceOutputByte = -1L;
+ long submitTime = -1L;
+ long launchTime = -1L;
+ long finishTime = -1L;
+
+ int heapMegabytes = -1;
+ int totalMaps = -1;
+ int totalReduces = -1;
+ JobHistory.Values outcome = JobHistory.Values.SUCCESS;
+ JobType jobtype = JobType.JAVA;
+ JobPriority priority = JobPriority.NORMAL;
+
+ List<String> directDependantJobs = new ArrayList<String>();
+ List<LoggedTask> mapTasks = new ArrayList<LoggedTask>();
+ List<LoggedTask> reduceTasks = new ArrayList<LoggedTask>();
+ List<LoggedTask> otherTasks = new ArrayList<LoggedTask>();
+
+ // There are CDFs for each level of locality -- most local first
+ ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs;
+ // There are CDFs for each level of locality -- most local first
+ ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs;
+
+ LoggedDiscreteCDF successfulReduceAttemptCDF;
+ LoggedDiscreteCDF failedReduceAttemptCDF;
+
+ String queue = null;
+
+ String jobName = null;
+
+ int clusterMapMB = -1;
+ int clusterReduceMB = -1;
+ int jobMapMB = -1;
+ int jobReduceMB = -1;
+
+ long relativeTime = 0;
+
+ double[] mapperTriesToSucceed;
+ double failedMapperFraction; // !!!!!
+
+ LoggedJob() {
+
+ }
+
+ LoggedJob(String jobID) {
+ super();
+
+ setJobID(jobID);
+ }
+
+ // TODO consider having default readers on the other objects
+ @JsonAnySetter
+ public void setUnknownAttribute(String attributeName, Object ignored) {
+ if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
+ alreadySeenAnySetterAttributes.add(attributeName);
+ System.err.println("In LoggedJob, we saw the unknown attribute "
+ + attributeName + ".");
+ }
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getJobID() {
+ return jobID;
+ }
+
+ void setJobID(String jobID) {
+ this.jobID = jobID;
+ }
+
+ public JobPriority getPriority() {
+ return priority;
+ }
+
+ void setPriority(JobPriority priority) {
+ this.priority = priority;
+ }
+
+ public long getComputonsPerMapInputByte() {
+ return computonsPerMapInputByte;
+ }
+
+ void setComputonsPerMapInputByte(long computonsPerMapInputByte) {
+ this.computonsPerMapInputByte = computonsPerMapInputByte;
+ }
+
+ public long getComputonsPerMapOutputByte() {
+ return computonsPerMapOutputByte;
+ }
+
+ void setComputonsPerMapOutputByte(long computonsPerMapOutputByte) {
+ this.computonsPerMapOutputByte = computonsPerMapOutputByte;
+ }
+
+ public long getComputonsPerReduceInputByte() {
+ return computonsPerReduceInputByte;
+ }
+
+ void setComputonsPerReduceInputByte(long computonsPerReduceInputByte) {
+ this.computonsPerReduceInputByte = computonsPerReduceInputByte;
+ }
+
+ public long getComputonsPerReduceOutputByte() {
+ return computonsPerReduceOutputByte;
+ }
+
+ void setComputonsPerReduceOutputByte(long computonsPerReduceOutputByte) {
+ this.computonsPerReduceOutputByte = computonsPerReduceOutputByte; // !!!!!
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ void setLaunchTime(long startTime) {
+ this.launchTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public int getHeapMegabytes() {
+ return heapMegabytes;
+ }
+
+ void setHeapMegabytes(int heapMegabytes) {
+ this.heapMegabytes = heapMegabytes;
+ }
+
+ public int getTotalMaps() {
+ return totalMaps;
+ }
+
+ void setTotalMaps(int totalMaps) {
+ this.totalMaps = totalMaps;
+ }
+
+ public int getTotalReduces() {
+ return totalReduces;
+ }
+
+ void setTotalReduces(int totalReduces) {
+ this.totalReduces = totalReduces;
+ }
+
+ public JobHistory.Values getOutcome() {
+ return outcome;
+ }
+
+ void setOutcome(JobHistory.Values outcome) {
+ this.outcome = outcome;
+ }
+
+ public JobType getJobtype() {
+ return jobtype;
+ }
+
+ void setJobtype(JobType jobtype) {
+ this.jobtype = jobtype;
+ }
+
+ public List<String> getDirectDependantJobs() {
+ return directDependantJobs;
+ }
+
+ void setDirectDependantJobs(List<String> directDependantJobs) {
+ this.directDependantJobs = directDependantJobs;
+ }
+
+ public List<LoggedTask> getMapTasks() {
+ return mapTasks;
+ }
+
+ void setMapTasks(List<LoggedTask> mapTasks) {
+ this.mapTasks = mapTasks;
+ }
+
+ public List<LoggedTask> getReduceTasks() {
+ return reduceTasks;
+ }
+
+ void setReduceTasks(List<LoggedTask> reduceTasks) {
+ this.reduceTasks = reduceTasks;
+ }
+
+ public List<LoggedTask> getOtherTasks() {
+ return otherTasks;
+ }
+
+ void setOtherTasks(List<LoggedTask> otherTasks) {
+ this.otherTasks = otherTasks;
+ }
+
+ public ArrayList<LoggedDiscreteCDF> getSuccessfulMapAttemptCDFs() {
+ return successfulMapAttemptCDFs;
+ }
+
+ void setSuccessfulMapAttemptCDFs(
+ ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs) {
+ this.successfulMapAttemptCDFs = successfulMapAttemptCDFs;
+ }
+
+ public ArrayList<LoggedDiscreteCDF> getFailedMapAttemptCDFs() {
+ return failedMapAttemptCDFs;
+ }
+
+ void setFailedMapAttemptCDFs(ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs) {
+ this.failedMapAttemptCDFs = failedMapAttemptCDFs;
+ }
+
+ public LoggedDiscreteCDF getSuccessfulReduceAttemptCDF() {
+ return successfulReduceAttemptCDF;
+ }
+
+ void setSuccessfulReduceAttemptCDF(
+ LoggedDiscreteCDF successfulReduceAttemptCDF) {
+ this.successfulReduceAttemptCDF = successfulReduceAttemptCDF;
+ }
+
+ public LoggedDiscreteCDF getFailedReduceAttemptCDF() {
+ return failedReduceAttemptCDF;
+ }
+
+ void setFailedReduceAttemptCDF(LoggedDiscreteCDF failedReduceAttemptCDF) {
+ this.failedReduceAttemptCDF = failedReduceAttemptCDF;
+ }
+
+ public double[] getMapperTriesToSucceed() {
+ return mapperTriesToSucceed;
+ }
+
+ void setMapperTriesToSucceed(double[] mapperTriesToSucceed) {
+ this.mapperTriesToSucceed = mapperTriesToSucceed;
+ }
+
+ public double getFailedMapperFraction() {
+ return failedMapperFraction;
+ }
+
+ void setFailedMapperFraction(double failedMapperFraction) {
+ this.failedMapperFraction = failedMapperFraction;
+ }
+
+ public long getRelativeTime() {
+ return relativeTime;
+ }
+
+ void setRelativeTime(long relativeTime) {
+ this.relativeTime = relativeTime;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public int getClusterMapMB() {
+ return clusterMapMB;
+ }
+
+ void setClusterMapMB(int clusterMapMB) {
+ this.clusterMapMB = clusterMapMB;
+ }
+
+ public int getClusterReduceMB() {
+ return clusterReduceMB;
+ }
+
+ void setClusterReduceMB(int clusterReduceMB) {
+ this.clusterReduceMB = clusterReduceMB;
+ }
+
+ public int getJobMapMB() {
+ return jobMapMB;
+ }
+
+ void setJobMapMB(int jobMapMB) {
+ this.jobMapMB = jobMapMB;
+ }
+
+ public int getJobReduceMB() {
+ return jobReduceMB;
+ }
+
+ void setJobReduceMB(int jobReduceMB) {
+ this.jobReduceMB = jobReduceMB;
+ }
+
+ private void compare1(String c1, String c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(long c1, long c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+ TreePath loc, String eltname) throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(JobType c1, JobType c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(JobPriority c1, JobPriority c2, TreePath loc,
+ String eltname) throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(int c1, int c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(double c1, double c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(double[] c1, double[] c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ TreePath recursePath = new TreePath(loc, eltname);
+
+ if (c1 == null || c2 == null || c1.length != c2.length) {
+ throw new DeepInequalityException(eltname + " miscompared", recursePath);
+ }
+
+ for (int i = 0; i < c1.length; ++i) {
+ if (c1[i] != c2[i]) {
+ throw new DeepInequalityException(eltname + " miscompared",
+ new TreePath(loc, eltname, i));
+ }
+ }
+ }
+
+ private void compare1(DeepCompare c1, DeepCompare c2, TreePath loc,
+ String eltname, int index) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ TreePath recursePath = new TreePath(loc, eltname, index);
+
+ if (c1 == null || c2 == null) {
+ if (index == -1) {
+ throw new DeepInequalityException(eltname + " miscompared", recursePath);
+ } else {
+ throw new DeepInequalityException(eltname + "[" + index
+ + "] miscompared", recursePath);
+ }
+ }
+
+ c1.deepCompare(c2, recursePath);
+ }
+
+ // I'll treat this as an atomic object type
+ private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
+ String eltname) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ TreePath recursePath = new TreePath(loc, eltname);
+
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", recursePath);
+ }
+ }
+
+ private void compareLoggedTasks(List<LoggedTask> c1, List<LoggedTask> c2,
+ TreePath loc, String eltname) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ for (int i = 0; i < c1.size(); ++i) {
+ c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+ }
+ }
+
+ private void compareCDFs(List<LoggedDiscreteCDF> c1,
+ List<LoggedDiscreteCDF> c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ for (int i = 0; i < c1.size(); ++i) {
+ c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+ }
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedJob)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedJob other = (LoggedJob) comparand;
+
+ compare1(jobID, other.jobID, loc, "jobID");
+ compare1(user, other.user, loc, "user");
+
+ compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
+ "computonsPerMapInputByte");
+ compare1(computonsPerMapOutputByte, other.computonsPerMapOutputByte, loc,
+ "computonsPerMapOutputByte");
+ compare1(computonsPerReduceInputByte, other.computonsPerReduceInputByte,
+ loc, "computonsPerReduceInputByte");
+ compare1(computonsPerReduceOutputByte, other.computonsPerReduceOutputByte,
+ loc, "computonsPerReduceOutputByte");
+
+ compare1(submitTime, other.submitTime, loc, "submitTime");
+ compare1(launchTime, other.launchTime, loc, "launchTime");
+ compare1(finishTime, other.finishTime, loc, "finishTime");
+
+ compare1(heapMegabytes, other.heapMegabytes, loc, "heapMegabytes");
+
+ compare1(totalMaps, other.totalMaps, loc, "totalMaps");
+ compare1(totalReduces, other.totalReduces, loc, "totalReduces");
+
+ compare1(outcome, other.outcome, loc, "outcome");
+ compare1(jobtype, other.jobtype, loc, "jobtype");
+ compare1(priority, other.priority, loc, "priority");
+
+ compareStrings(directDependantJobs, other.directDependantJobs, loc,
+ "directDependantJobs");
+
+ compareLoggedTasks(mapTasks, other.mapTasks, loc, "mapTasks");
+ compareLoggedTasks(reduceTasks, other.reduceTasks, loc, "reduceTasks");
+ compareLoggedTasks(otherTasks, other.otherTasks, loc, "otherTasks");
+
+ compare1(relativeTime, other.relativeTime, loc, "relativeTime");
+
+ compareCDFs(successfulMapAttemptCDFs, other.successfulMapAttemptCDFs, loc,
+ "successfulMapAttemptCDFs");
+ compareCDFs(failedMapAttemptCDFs, other.failedMapAttemptCDFs, loc,
+ "failedMapAttemptCDFs");
+ compare1(successfulReduceAttemptCDF, other.successfulReduceAttemptCDF, loc,
+ "successfulReduceAttemptCDF", -1);
+ compare1(failedReduceAttemptCDF, other.failedReduceAttemptCDF, loc,
+ "failedReduceAttemptCDF", -1);
+
+ compare1(mapperTriesToSucceed, other.mapperTriesToSucceed, loc,
+ "mapperTriesToSucceed");
+ compare1(failedMapperFraction, other.failedMapperFraction, loc,
+ "failedMapperFraction");
+
+ compare1(queue, other.queue, loc, "queue");
+ compare1(jobName, other.jobName, loc, "jobName");
+
+ compare1(clusterMapMB, other.clusterMapMB, loc, "clusterMapMB");
+ compare1(clusterReduceMB, other.clusterReduceMB, loc, "clusterReduceMB");
+ compare1(jobMapMB, other.jobMapMB, loc, "jobMapMB");
+ compare1(jobReduceMB, other.jobReduceMB, loc, "jobReduceMB");
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link LoggedLocation} is a representation of a point in an hierarchical
+ * network, represented as a series of membership names, broadest first.
+ *
+ * For example, if your network has <i>hosts</i> grouped into <i>racks</i>, then
+ * in onecluster you might have a node {@code node1} on rack {@code rack1}. This
+ * would be represented with a ArrayList of two layers, with two {@link String}
+ * s being {@code "rack1"} and {@code "node1"}.
+ *
+ * The details of this class are set up to meet the requirements of the Jackson
+ * JSON parser/generator.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedLocation implements DeepCompare {
+ // The full path from the root of the network to the host.
+ //
+ // NOTE that this assumes that the network topology is a tree.
+ List<String> layers = new ArrayList<String>();
+
+ public List<String> getLayers() {
+ return layers;
+ }
+
+ void setLayers(List<String> layers) {
+ this.layers = layers;
+ }
+
+ // I'll treat this as an atomic object type
+ private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
+ String eltname) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ TreePath recursePath = new TreePath(loc, eltname);
+
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", recursePath);
+ }
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedLocation)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedLocation other = (LoggedLocation) comparand;
+
+ compareStrings(layers, other.layers, loc, "layers");
+
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+/**
+ * A {@link LoggedNetworkTopology} represents a tree that in turn represents a
+ * hierarchy of hosts. The current version requires the tree to have all leaves
+ * at the same level.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedNetworkTopology implements DeepCompare {
+ String name;
+ List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
+
+ public LoggedNetworkTopology() {
+ super();
+ }
+
+ /**
+ * We need this because we have to sort the {@code children} field. That field
+ * is set-valued, but if we sort these fields we ensure that comparisons won't
+ * bogusly fail because the hash table happened to enumerate in a different
+ * order.
+ *
+ */
+ static private class TopoSort implements Comparator<LoggedNetworkTopology> {
+ public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
+ return t1.name.compareTo(t2.name);
+ }
+ }
+
+ /**
+ * @param hosts
+ * a HashSet of the {@link ParsedHost}
+ * @param name
+ * the name of this level's host [for recursive descent]
+ * @param level
+ * the level number
+ */
+ LoggedNetworkTopology(HashSet<ParsedHost> hosts, String name, int level) {
+
+ this.name = name;
+ this.children = null;
+
+ if (level < ParsedHost.numberOfDistances() - 1) {
+ HashMap<String, HashSet<ParsedHost>> topologies = new HashMap<String, HashSet<ParsedHost>>();
+
+ Iterator<ParsedHost> iter = hosts.iterator();
+
+ while (iter.hasNext()) {
+ ParsedHost host = iter.next();
+
+ String thisComponent = host.nameComponent(level);
+
+ HashSet<ParsedHost> thisSet = topologies.get(thisComponent);
+
+ if (thisSet == null) {
+ thisSet = new HashSet<ParsedHost>();
+ topologies.put(thisComponent, thisSet);
+ }
+
+ thisSet.add(host);
+ }
+
+ children = new ArrayList<LoggedNetworkTopology>();
+
+ for (Map.Entry<String, HashSet<ParsedHost>> ent : topologies.entrySet()) {
+ children.add(new LoggedNetworkTopology(ent.getValue(), ent.getKey(),
+ level + 1));
+ }
+ } else {
+ // nothing to do here
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ public List<LoggedNetworkTopology> getChildren() {
+ return children;
+ }
+
+ void setChildren(List<LoggedNetworkTopology> children) {
+ this.children = children;
+ }
+
+ private void compare1(List<LoggedNetworkTopology> c1,
+ List<LoggedNetworkTopology> c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ Collections.sort(c1, new TopoSort());
+ Collections.sort(c2, new TopoSort());
+
+ for (int i = 0; i < c1.size(); ++i) {
+ c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+ }
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedNetworkTopology)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedNetworkTopology other = (LoggedNetworkTopology) comparand;
+
+ compare1(children, other.children, loc, "children");
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * A {@link LoggedSingleRelativeRanking} represents an X-Y coordinate of a
+ * single point in a discrete CDF.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedSingleRelativeRanking implements DeepCompare {
+ /**
+ * The Y coordinate, as a fraction {@code ( 0.0D, 1.0D )}. The default value
+ * is there to mark an unfilled-in value.
+ */
+ double relativeRanking = -1.0D;
+ /**
+ * The X coordinate
+ */
+ long datum = -1L;
+
+ public double getRelativeRanking() {
+ return relativeRanking;
+ }
+
+ void setRelativeRanking(double relativeRanking) {
+ this.relativeRanking = relativeRanking;
+ }
+
+ public long getDatum() {
+ return datum;
+ }
+
+ void setDatum(long datum) {
+ this.datum = datum;
+ }
+
+ private void compare1(long c1, long c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(double c1, double c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedSingleRelativeRanking)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedSingleRelativeRanking other = (LoggedSingleRelativeRanking) comparand;
+
+ compare1(relativeRanking, other.relativeRanking, loc, "relativeRanking");
+ compare1(datum, other.datum, loc, "datum");
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+/**
+ * A {@link LoggedTask} represents a [hadoop] task that is part of a hadoop job.
+ * It knows about the [pssibly empty] sequence of attempts, its I/O footprint,
+ * and its runtime.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedTask implements DeepCompare {
+ long inputBytes = -1L;
+ long inputRecords = -1L;
+ long outputBytes = -1L;
+ long outputRecords = -1L;
+ String taskID;
+ long startTime = -1L;
+ long finishTime = -1L;
+ JobHistory.Values taskType;
+ JobHistory.Values taskStatus;
+ List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
+
+ ArrayList<LoggedLocation> preferredLocations = new ArrayList<LoggedLocation>();
+
+ int numberMaps = -1;
+ int numberReduces = -1;
+
+ LoggedTask() {
+ super();
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ }
+
+ public String getTaskID() {
+ return taskID;
+ }
+
+ void setTaskID(String taskID) {
+ this.taskID = taskID;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public List<LoggedTaskAttempt> getAttempts() {
+ return attempts;
+ }
+
+ void setAttempts(List<LoggedTaskAttempt> attempts) {
+ this.attempts = attempts;
+ }
+
+ public ArrayList<LoggedLocation> getPreferredLocations() {
+ return preferredLocations;
+ }
+
+ void setPreferredLocations(ArrayList<LoggedLocation> preferredLocations) {
+ this.preferredLocations = preferredLocations;
+ }
+
+ public int getNumberMaps() {
+ return numberMaps;
+ }
+
+ void setNumberMaps(int numberMaps) {
+ this.numberMaps = numberMaps;
+ }
+
+ public int getNumberReduces() {
+ return numberReduces;
+ }
+
+ void setNumberReduces(int numberReduces) {
+ this.numberReduces = numberReduces;
+ }
+
+ public JobHistory.Values getTaskStatus() {
+ return taskStatus;
+ }
+
+ void setTaskStatus(JobHistory.Values taskStatus) {
+ this.taskStatus = taskStatus;
+ }
+
+ public JobHistory.Values getTaskType() {
+ return taskType;
+ }
+
+ void setTaskType(JobHistory.Values taskType) {
+ this.taskType = taskType;
+ }
+
+ private void compare1(long c1, long c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(String c1, String c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+ TreePath loc, String eltname) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compareLoggedLocations(ArrayList<LoggedLocation> c1,
+ ArrayList<LoggedLocation> c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ for (int i = 0; i < c1.size(); ++i) {
+ c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+ }
+ }
+
+ private void compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1,
+ List<LoggedTaskAttempt> c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ for (int i = 0; i < c1.size(); ++i) {
+ c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
+ }
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedTask)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedTask other = (LoggedTask) comparand;
+
+ compare1(inputBytes, other.inputBytes, loc, "inputBytes");
+ compare1(inputRecords, other.inputRecords, loc, "inputRecords");
+ compare1(outputBytes, other.outputBytes, loc, "outputBytes");
+ compare1(outputRecords, other.outputRecords, loc, "outputRecords");
+
+ compare1(taskID, other.taskID, loc, "taskID");
+
+ compare1(startTime, other.startTime, loc, "startTime");
+ compare1(finishTime, other.finishTime, loc, "finishTime");
+
+ compare1(taskType, other.taskType, loc, "taskType");
+ compare1(taskStatus, other.taskStatus, loc, "taskStatus");
+
+ compareLoggedTaskAttempts(attempts, other.attempts, loc, "attempts");
+ compareLoggedLocations(preferredLocations, other.preferredLocations, loc,
+ "preferredLocations");
+ }
+ /*
+ * ArrayList<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
+ *
+ * ArrayList<LoggedLocation> preferredLocations;
+ *
+ * int numberMaps = -1; int numberReduces = -1;
+ */
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.JobHistory;
+
+// HACK ALERT!!! This "should" have have two subclasses, which might be called
+// LoggedMapTaskAttempt and LoggedReduceTaskAttempt, but
+// the Jackson implementation of JSON doesn't handle a
+// superclass-valued field.
+
+/**
+ * A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
+ * hadoop job. Note that a task can have several attempts.
+ *
+ * All of the public methods are simply accessors for the instance variables we
+ * want to write out in the JSON files.
+ *
+ */
+public class LoggedTaskAttempt implements DeepCompare {
+
+ String attemptID;
+ JobHistory.Values result;
+ long startTime = -1L;
+ long finishTime = -1L;
+ String hostName;
+
+ long hdfsBytesRead = -1L;
+ long hdfsBytesWritten = -1L;
+ long fileBytesRead = -1L;
+ long fileBytesWritten = -1L;
+ long mapInputRecords = -1L;
+ long mapInputBytes = -1L;
+ long mapOutputBytes = -1L;
+ long mapOutputRecords = -1L;
+ long combineInputRecords = -1L;
+ long reduceInputGroups = -1L;
+ long reduceInputRecords = -1L;
+ long reduceShuffleBytes = -1L;
+ long reduceOutputRecords = -1L;
+ long spilledRecords = -1L;
+
+ long shuffleFinished = -1L;
+ long sortFinished = -1L;
+
+ LoggedLocation location;
+
+ LoggedTaskAttempt() {
+ super();
+ }
+
+ public long getShuffleFinished() {
+ return shuffleFinished;
+ }
+
+ void setShuffleFinished(long shuffleFinished) {
+ this.shuffleFinished = shuffleFinished;
+ }
+
+ public long getSortFinished() {
+ return sortFinished;
+ }
+
+ void setSortFinished(long sortFinished) {
+ this.sortFinished = sortFinished;
+ }
+
+ public String getAttemptID() {
+ return attemptID;
+ }
+
+ void setAttemptID(String attemptID) {
+ this.attemptID = attemptID;
+ }
+
+ public JobHistory.Values getResult() {
+ return result;
+ }
+
+ void setResult(JobHistory.Values result) {
+ this.result = result;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public long getHdfsBytesRead() {
+ return hdfsBytesRead;
+ }
+
+ void setHdfsBytesRead(long hdfsBytesRead) {
+ this.hdfsBytesRead = hdfsBytesRead;
+ }
+
+ public long getHdfsBytesWritten() {
+ return hdfsBytesWritten;
+ }
+
+ void setHdfsBytesWritten(long hdfsBytesWritten) {
+ this.hdfsBytesWritten = hdfsBytesWritten;
+ }
+
+ public long getFileBytesRead() {
+ return fileBytesRead;
+ }
+
+ void setFileBytesRead(long fileBytesRead) {
+ this.fileBytesRead = fileBytesRead;
+ }
+
+ public long getFileBytesWritten() {
+ return fileBytesWritten;
+ }
+
+ void setFileBytesWritten(long fileBytesWritten) {
+ this.fileBytesWritten = fileBytesWritten;
+ }
+
+ public long getMapInputRecords() {
+ return mapInputRecords;
+ }
+
+ void setMapInputRecords(long mapInputRecords) {
+ this.mapInputRecords = mapInputRecords;
+ }
+
+ public long getMapOutputBytes() {
+ return mapOutputBytes;
+ }
+
+ void setMapOutputBytes(long mapOutputBytes) {
+ this.mapOutputBytes = mapOutputBytes;
+ }
+
+ public long getMapOutputRecords() {
+ return mapOutputRecords;
+ }
+
+ void setMapOutputRecords(long mapOutputRecords) {
+ this.mapOutputRecords = mapOutputRecords;
+ }
+
+ public long getCombineInputRecords() {
+ return combineInputRecords;
+ }
+
+ void setCombineInputRecords(long combineInputRecords) {
+ this.combineInputRecords = combineInputRecords;
+ }
+
+ public long getReduceInputGroups() {
+ return reduceInputGroups;
+ }
+
+ void setReduceInputGroups(long reduceInputGroups) {
+ this.reduceInputGroups = reduceInputGroups;
+ }
+
+ public long getReduceInputRecords() {
+ return reduceInputRecords;
+ }
+
+ void setReduceInputRecords(long reduceInputRecords) {
+ this.reduceInputRecords = reduceInputRecords;
+ }
+
+ public long getReduceShuffleBytes() {
+ return reduceShuffleBytes;
+ }
+
+ void setReduceShuffleBytes(long reduceShuffleBytes) {
+ this.reduceShuffleBytes = reduceShuffleBytes;
+ }
+
+ public long getReduceOutputRecords() {
+ return reduceOutputRecords;
+ }
+
+ void setReduceOutputRecords(long reduceOutputRecords) {
+ this.reduceOutputRecords = reduceOutputRecords;
+ }
+
+ public long getSpilledRecords() {
+ return spilledRecords;
+ }
+
+ void setSpilledRecords(long spilledRecords) {
+ this.spilledRecords = spilledRecords;
+ }
+
+ public LoggedLocation getLocation() {
+ return location;
+ }
+
+ void setLocation(LoggedLocation location) {
+ this.location = location;
+ }
+
+ public long getMapInputBytes() {
+ return mapInputBytes;
+ }
+
+ void setMapInputBytes(long mapInputBytes) {
+ this.mapInputBytes = mapInputBytes;
+ }
+
+ private void compare1(String c1, String c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || !c1.equals(c2)) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(long c1, long c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+ TreePath loc, String eltname) throws DeepInequalityException {
+ if (c1 != c2) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+ }
+
+ private void compare1(LoggedLocation c1, LoggedLocation c2, TreePath loc,
+ String eltname) throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ TreePath recurse = new TreePath(loc, eltname);
+
+ if (c1 == null || c2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared", recurse);
+ }
+
+ c1.deepCompare(c2, recurse);
+ }
+
+ public void deepCompare(DeepCompare comparand, TreePath loc)
+ throws DeepInequalityException {
+ if (!(comparand instanceof LoggedTaskAttempt)) {
+ throw new DeepInequalityException("comparand has wrong type", loc);
+ }
+
+ LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
+
+ compare1(attemptID, other.attemptID, loc, "attemptID");
+ compare1(result, other.result, loc, "result");
+ compare1(startTime, other.startTime, loc, "startTime");
+ compare1(finishTime, other.finishTime, loc, "finishTime");
+ compare1(hostName, other.hostName, loc, "hostName");
+
+ compare1(hdfsBytesRead, other.hdfsBytesRead, loc, "hdfsBytesRead");
+ compare1(hdfsBytesWritten, other.hdfsBytesWritten, loc, "hdfsBytesWritten");
+ compare1(fileBytesRead, other.fileBytesRead, loc, "fileBytesRead");
+ compare1(fileBytesWritten, other.fileBytesWritten, loc, "fileBytesWritten");
+ compare1(mapInputBytes, other.mapInputBytes, loc, "mapInputBytes");
+ compare1(mapInputRecords, other.mapInputRecords, loc, "mapInputRecords");
+ compare1(mapOutputBytes, other.mapOutputBytes, loc, "mapOutputBytes");
+ compare1(mapOutputRecords, other.mapOutputRecords, loc, "mapOutputRecords");
+ compare1(combineInputRecords, other.combineInputRecords, loc,
+ "combineInputRecords");
+ compare1(reduceInputGroups, other.reduceInputGroups, loc,
+ "reduceInputGroups");
+ compare1(reduceInputRecords, other.reduceInputRecords, loc,
+ "reduceInputRecords");
+ compare1(reduceShuffleBytes, other.reduceShuffleBytes, loc,
+ "reduceShuffleBytes");
+ compare1(reduceOutputRecords, other.reduceOutputRecords, loc,
+ "reduceOutputRecords");
+ compare1(spilledRecords, other.spilledRecords, loc, "spilledRecords");
+
+ compare1(shuffleFinished, other.shuffleFinished, loc, "shuffleFinished");
+ compare1(sortFinished, other.sortFinished, loc, "sortFinished");
+
+ compare1(location, other.location, loc, "location");
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+public class MapTaskAttemptInfo extends TaskAttemptInfo {
+
+ private long runtime;
+
+ public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {
+ super(state, taskInfo);
+ this.runtime = runtime;
+ }
+
+ @Override
+ public long getRuntime() {
+ return getMapRuntime();
+ }
+
+ /**
+ * Get the runtime for the <b>map</b> phase of the map-task attempt.
+ *
+ * @return the runtime for the <b>map</b> phase of the map-task attempt
+ */
+ public long getMapRuntime() {
+ return runtime;
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pair.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+class Pair<CarType, CdrType> {
+ private final CarType car;
+ private final CdrType cdr;
+
+ Pair(CarType car, CdrType cdr) {
+ super();
+
+ this.car = car;
+ this.cdr = cdr;
+ }
+
+ CarType first() {
+ return car;
+ }
+
+ CdrType second() {
+ return cdr;
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.io.StringReader;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Element;
+import org.w3c.dom.Text;
+
+import org.xml.sax.SAXException;
+
+class ParsedConfigFile {
+ static Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+
+ static Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+
+ final int heapMegabytes;
+
+ final String queue;
+ final String jobName;
+
+ final int clusterMapMB;
+ final int clusterReduceMB;
+ final int jobMapMB;
+ final int jobReduceMB;
+
+ final String jobID;
+
+ final boolean valid;
+
+ private int maybeGetIntValue(String propName, String attr, String value,
+ int oldValue) {
+ if (propName.equals(attr) && value != null) {
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ return oldValue;
+ }
+ }
+
+ return oldValue;
+ }
+
+ ParsedConfigFile(String filenameLine, String xmlString) {
+ super();
+
+ int heapMegabytes = -1;
+
+ String queue = null;
+ String jobName = null;
+
+ int clusterMapMB = -1;
+ int clusterReduceMB = -1;
+ int jobMapMB = -1;
+ int jobReduceMB = -1;
+
+ String jobID = null;
+
+ boolean valid = true;
+
+ Matcher jobIDMatcher = jobIDPattern.matcher(filenameLine);
+
+ if (jobIDMatcher.find()) {
+ jobID = jobIDMatcher.group(1);
+ }
+
+ try {
+ InputStream is = new ByteArrayInputStream(xmlString.getBytes());
+
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+ DocumentBuilder db = dbf.newDocumentBuilder();
+
+ Document doc = db.parse(is);
+
+ Element root = doc.getDocumentElement();
+
+ if (!"configuration".equals(root.getTagName())) {
+ System.out.print("root is not a configuration node");
+ valid = false;
+ }
+
+ NodeList props = root.getChildNodes();
+
+ for (int i = 0; i < props.getLength(); ++i) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element))
+ continue;
+ Element prop = (Element) propNode;
+ if (!"property".equals(prop.getTagName())) {
+ System.out.print("bad conf file: element not <property>");
+ }
+ NodeList fields = prop.getChildNodes();
+ String attr = null;
+ String value = null;
+ boolean finalParameter = false;
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element)) {
+ continue;
+ }
+
+ Element field = (Element) fieldNode;
+ if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+ attr = ((Text) field.getFirstChild()).getData().trim();
+ }
+ if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+ value = ((Text) field.getFirstChild()).getData();
+ }
+ if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+ finalParameter = "true".equals(((Text) field.getFirstChild())
+ .getData());
+ }
+ }
+ if ("mapred.child.java.opts".equals(attr) && value != null) {
+ Matcher matcher = heapPattern.matcher(value);
+ if (matcher.find()) {
+ String heapSize = matcher.group(1);
+
+ heapMegabytes = Integer.parseInt(heapSize);
+
+ if (matcher.group(2).equalsIgnoreCase("G")) {
+ heapMegabytes *= 1024;
+ }
+ }
+ }
+
+ if ("mapred.job.queue.name".equals(attr) && value != null) {
+ queue = value;
+ }
+
+ if ("mapred.job.name".equals(attr) && value != null) {
+ jobName = value;
+ }
+
+ clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
+ value, clusterMapMB);
+ clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
+ attr, value, clusterReduceMB);
+ jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
+ jobMapMB);
+ jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
+ value, jobReduceMB);
+ }
+
+ valid = true;
+ } catch (ParserConfigurationException e) {
+ valid = false;
+ } catch (SAXException e) {
+ valid = false;
+ } catch (IOException e) {
+ valid = false;
+ }
+
+ this.heapMegabytes = heapMegabytes;
+
+ this.queue = queue;
+ this.jobName = jobName;
+
+ this.clusterMapMB = clusterMapMB;
+ this.clusterReduceMB = clusterReduceMB;
+ this.jobMapMB = jobMapMB;
+ this.jobReduceMB = jobReduceMB;
+
+ this.jobID = jobID;
+
+ this.valid = valid;
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+class ParsedHost {
+ String rackName;
+ String nodeName;
+
+ private static Pattern splitPattern = Pattern
+ .compile("/([0-9\\\\\\.]+)/(.+)");
+
+ static int numberOfDistances() {
+ return 3;
+ }
+
+ /**
+ * @return the components, broadest first [ie., the last element is always the
+ * individual node name]
+ */
+ String[] nameComponents() {
+ String[] result = new String[2];
+
+ result[0] = rackName;
+ result[1] = nodeName;
+
+ return result;
+ }
+
+ String nameComponent(int i) throws IllegalArgumentException {
+ switch (i) {
+ case 0:
+ return rackName;
+
+ case 1:
+ return nodeName;
+
+ default:
+ throw new IllegalArgumentException(
+ "Host location component index out of range.");
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return rackName.hashCode() * 17 + nodeName.hashCode();
+ }
+
+ ParsedHost(String name) throws IllegalArgumentException {
+ // separate out the node name
+ Matcher matcher = splitPattern.matcher(name);
+
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Illegal node designator: \"" + name
+ + "\"");
+ }
+
+ rackName = matcher.group(1);
+ nodeName = matcher.group(2);
+ }
+
+ public ParsedHost(LoggedLocation loc) {
+ List<String> coordinates = loc.getLayers();
+
+ rackName = coordinates.get(0);
+ nodeName = coordinates.get(1);
+ }
+
+ LoggedLocation makeLoggedLocation() {
+ LoggedLocation result = new LoggedLocation();
+
+ List<String> coordinates = new ArrayList<String>();
+
+ coordinates.add(rackName);
+ coordinates.add(nodeName);
+
+ result.setLayers(coordinates);
+
+ return result;
+ }
+
+ // expects the broadest name first
+ ParsedHost(String[] names) {
+ rackName = names[0];
+ nodeName = names[1];
+ }
+
+ // returns the broadest name first
+ String[] nameList() {
+ String[] result = new String[2];
+
+ result[0] = rackName;
+ result[1] = nodeName;
+
+ return result;
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof ParsedHost) {
+ return distance((ParsedHost) other) == 0;
+ }
+
+ return false;
+ }
+
+ int distance(ParsedHost other) {
+ if (nodeName.equals(other.nodeName)) {
+ return 0;
+ }
+
+ if (rackName.equals(other.rackName)) {
+ return 1;
+ }
+
+ return 2;
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+class ParsedLine {
+
+ Properties content;
+
+ LogRecordType type;
+
+ static Pattern keyValPair = Pattern
+ .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+
+ /**
+ *
+ */
+ ParsedLine() {
+ }
+
+ ParsedLine(String fullLine, int version) {
+ super();
+
+ content = new Properties();
+
+ int firstSpace = fullLine.indexOf(" ");
+
+ if (firstSpace < 0) {
+ firstSpace = fullLine.length();
+ }
+
+ if (firstSpace == 0) {
+ return; // This is a junk line of some sort
+ }
+
+ type = LogRecordType.intern(fullLine.substring(0, firstSpace));
+
+ String propValPairs = fullLine.substring(firstSpace + 1);
+
+ while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
+ propValPairs = propValPairs.substring(1);
+ }
+
+ int cursor = 0;
+
+ while (cursor < propValPairs.length()) {
+ int equals = propValPairs.indexOf('=', cursor);
+
+ if (equals < 0) {
+ // maybe we do some error processing
+ return;
+ }
+
+ int nextCursor;
+
+ int endValue;
+
+ if (propValPairs.charAt(equals + 1) == '\"') {
+ int closeQuote = propValPairs.indexOf('\"', equals + 2);
+
+ nextCursor = closeQuote + 1;
+
+ endValue = closeQuote;
+
+ if (closeQuote < 0) {
+ endValue = propValPairs.length();
+
+ nextCursor = endValue;
+ }
+ } else {
+ int closeSpace = propValPairs.indexOf(' ', equals + 1);
+
+ if (closeSpace < 0) {
+ closeSpace = propValPairs.length();
+ }
+
+ endValue = closeSpace;
+
+ nextCursor = endValue;
+ }
+
+ content.setProperty(propValPairs.substring(cursor, equals), propValPairs
+ .substring(equals + 2, endValue));
+
+ cursor = nextCursor;
+
+ while (cursor < propValPairs.length()
+ && propValPairs.charAt(cursor) == ' ') {
+ ++cursor;
+ }
+ }
+ }
+
+ protected LogRecordType getType() {
+ return type;
+ }
+
+ protected String get(String key) {
+ return content.getProperty(key);
+ }
+
+ protected long getLong(String key) {
+ String val = get(key);
+
+ return Long.parseLong(val);
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.InputStream;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.DeserializationConfig;
+
+/**
+ * {@link Parser} is an interface to the underlining JSON files. To use the
+ * parser, create a Parser object, and Parser.getNextJob() will return next job
+ * in the trace every time it is called.
+ */
+public class Parser implements Closeable {
+ private final JsonParser parser;
+ private final ObjectMapper mapper;
+ private LoggedNetworkTopology topology;
+
+ // DEBUG
+ public Parser() {
+ parser = null;
+ mapper = null;
+ }
+
+ public Parser(java.io.Reader digest) throws IOException {
+ mapper = new ObjectMapper();
+ mapper.configure(
+ DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ parser = mapper.getJsonFactory().createJsonParser(digest);
+ }
+
+ public Parser(InputStream stream) throws JsonParseException, IOException {
+ mapper = new ObjectMapper();
+ mapper.configure(
+ DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ parser = mapper.getJsonFactory().createJsonParser(stream);
+ }
+
+ public JobStory getNextJob() throws IOException {
+ try {
+ final LoggedJob job = mapper.readValue(parser, LoggedJob.class);
+ return null == job ? null : new ZombieJob(job, topology);
+ } catch (EOFException e) {
+ return null;
+ } catch (JsonProcessingException e) {
+ throw new IOException(e);
+ }
+ // System.out.println(job.getJobID() + ": user-" + job.getUser() + ": "
+ // + job.getMapTasks().size() + "-"
+ // + job.getReduceTasks().size() + "-"
+ // + job.getOtherTasks().size() + ", @ "
+ // + (new Date(job.getLaunchTime())).toString());
+ }
+
+ public LoggedNetworkTopology readTopology(File topologyFile)
+ throws JsonParseException, JsonMappingException, IOException {
+ this.topology = mapper.readValue(topologyFile, LoggedNetworkTopology.class);
+ return this.topology;
+ }
+
+ public LoggedNetworkTopology readTopology(InputStream stream)
+ throws JsonParseException, JsonMappingException, IOException {
+ this.topology = mapper.readValue(stream, LoggedNetworkTopology.class);
+ return this.topology;
+ }
+
+ public void close() throws IOException {
+ parser.close();
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
+
+ private long shuffleTime;
+ private long mergeTime;
+ private long reduceTime;
+
+ public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+ long mergeTime, long reduceTime) {
+ super(state, taskInfo);
+ this.shuffleTime = shuffleTime;
+ this.mergeTime = mergeTime;
+ this.reduceTime = reduceTime;
+ }
+
+ /**
+ * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
+ *
+ * @return the runtime for the <b>reduce</b> phase of the reduce task-attempt
+ */
+ public long getReduceRuntime() {
+ return reduceTime;
+ }
+
+ /**
+ * Get the runtime for the <b>shuffle</b> phase of the reduce task-attempt.
+ *
+ * @return the runtime for the <b>shuffle</b> phase of the reduce task-attempt
+ */
+ public long getShuffleRuntime() {
+ return shuffleTime;
+ }
+
+ /**
+ * Get the runtime for the <b>merge</b> phase of the reduce task-attempt
+ *
+ * @return the runtime for the <b>merge</b> phase of the reduce task-attempt
+ */
+ public long getMergeRuntime() {
+ return mergeTime;
+ }
+
+ @Override
+ public long getRuntime() {
+ // XXX Does this make sense? Should TaskAttemptInfo.getRuntime() return
+ // total reduce runtime, rather than reduce phase time?
+ return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+/**
+ * {@link TaskAttemptInfo} is a collection of statistics about a particular
+ * task-attempt gleaned from job-history of the job.
+ */
+public abstract class TaskAttemptInfo {
+ protected final State state;
+ protected final TaskInfo taskInfo;
+
+ protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+ if (state == State.SUCCEEDED || state == State.FAILED) {
+ this.state = state;
+ } else {
+ throw new IllegalArgumentException("status cannot be " + state);
+ }
+ this.taskInfo = taskInfo;
+ }
+
+ /**
+ * Get the final {@link TaskStatus.State} of the task-attempt.
+ *
+ * @return the final <code>State</code> of the task-attempt
+ */
+ public State getRunState() {
+ return state;
+ }
+
+ /**
+ * Get the total runtime for the task-attempt.
+ *
+ * @return the total runtime for the task-attempt
+ */
+ public abstract long getRuntime();
+
+ /**
+ * Get the {@link TaskInfo} for the given task-attempt.
+ *
+ * @return the <code>TaskInfo</code> for the given task-attempt
+ */
+ public TaskInfo getTaskInfo() {
+ return taskInfo;
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class TaskInfo {
+
+ private final long bytesIn;
+ private final int recsIn;
+ private final long bytesOut;
+ private final int recsOut;
+ private final long maxMemory;
+
+ public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, int maxMemory) {
+ this.bytesIn = bytesIn;
+ this.recsIn = recsIn;
+ this.bytesOut = bytesOut;
+ this.recsOut = recsOut;
+ this.maxMemory = maxMemory;
+ }
+
+ /**
+ * @return Raw bytes read from the FileSystem into the task. Note that this
+ * may not always match the input bytes to the task.
+ */
+ public long getInputBytes() {
+ return bytesIn;
+ }
+
+ /**
+ * @return Number of records input to this task.
+ */
+ public int getInputRecords() {
+ return recsIn;
+ }
+
+ /**
+ * @return Raw bytes written to the destination FileSystem. Note that this may
+ * not match output bytes.
+ */
+ public long getOutputBytes() {
+ return bytesOut;
+ }
+
+ /**
+ * @return Number of records output from this task.
+ */
+ public int getOutputRecords() {
+ return recsOut;
+ }
+
+ /**
+ * @return Memory used by the task leq the heap size.
+ */
+ public long getTaskMemory() {
+ return maxMemory;
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * This describes a path from a node to the root. We use it when we compare two
+ * trees during rumen unit tests. If the trees are not identical, this chain
+ * will be converted to a string which describes the path from the root to the
+ * fields that did not compare.
+ *
+ */
+public class TreePath {
+ final TreePath parent;
+
+ final String fieldName;
+
+ final int index;
+
+ public TreePath(TreePath parent, String fieldName) {
+ super();
+
+ this.parent = parent;
+ this.fieldName = fieldName;
+ this.index = -1;
+ }
+
+ public TreePath(TreePath parent, String fieldName, int index) {
+ super();
+
+ this.parent = parent;
+ this.fieldName = fieldName;
+ this.index = index;
+ }
+
+ public String toString() {
+ String mySegment = fieldName + (index == -1 ? "" : ("[" + index + "]"));
+
+ return ((parent == null) ? "" : parent.toString() + "-->") + mySegment;
+ }
+}