You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/12/17 01:31:50 UTC
svn commit: r1215367 [2/2] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-a...
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 00:31:46 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:1161333-1214939
+/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:1161333-1215364
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:776175-785643
/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:817878-835934
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 00:31:46 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:1159757-1214939
+/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:1159757-1215364
/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr/org/apache/hadoop/ipc:713112
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/ipc:713112
/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/ipc:776175-784663
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 00:31:46 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:1161333-1214939
+/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:1161333-1215364
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:776175-785643
/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 00:31:46 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:1161333-1214939
+/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:1161333-1215364
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:776175-785643
/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:817878-835934
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java Sat Dec 17 00:31:46 2011
@@ -20,12 +20,8 @@ package org.apache.hadoop.tools.rumen;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Sat Dec 17 00:31:46 2011
@@ -960,11 +960,11 @@ public class TestRumenJobTraces {
for (LoggedNetworkTopology rack : racks) {
List<LoggedNetworkTopology> nodes = rack.getChildren();
- if (rack.getName().endsWith(".64")) {
+ if (rack.getName().getValue().endsWith(".64")) {
assertEquals("The singleton rack has the wrong number of elements", 1,
nodes.size());
sawSingleton = true;
- } else if (rack.getName().endsWith(".80")) {
+ } else if (rack.getName().getValue().endsWith(".80")) {
assertEquals("The doubleton rack has the wrong number of elements", 2,
nodes.size());
sawDoubleton = true;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java Sat Dec 17 00:31:46 2011
@@ -35,23 +35,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
public class Folder extends Configured implements Tool {
private long outputDuration = -1;
private long inputCycle = -1;
@@ -66,7 +55,7 @@ public class Folder extends Configured i
static final private Log LOG = LogFactory.getLog(Folder.class);
private DeskewedJobTraceReader reader = null;
- private JsonGenerator outGen = null;
+ private Outputter<LoggedJob> outGen = null;
private List<Path> tempPaths = new LinkedList<Path>();
@@ -171,25 +160,8 @@ public class Folder extends Configured i
skewBufferLength, !allowMissorting);
Path outPath = new Path(outputPathName);
- ObjectMapper outMapper = new ObjectMapper();
- outMapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory outFactory = outMapper.getJsonFactory();
- FileSystem outFS = outPath.getFileSystem(conf);
-
- CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(outPath);
- OutputStream output;
- Compressor compressor = null;
- if (codec != null) {
- compressor = CodecPool.getCompressor(codec);
- output = codec.createOutputStream(outFS.create(outPath), compressor);
- } else {
- output = outFS.create(outPath);
- }
-
- outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
- outGen.useDefaultPrettyPrinter();
+ outGen = new DefaultOutputter<LoggedJob>();
+ outGen.init(outPath, conf);
tempDir =
tempDirName == null ? outPath.getParent() : new Path(tempDirName);
@@ -258,11 +230,6 @@ public class Folder extends Configured i
}
}
- ObjectMapper outMapper = new ObjectMapper();
- outMapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory outFactory = outMapper.getJsonFactory();
-
// we initialize an empty heap so if we take an error before establishing
// a real one the finally code goes through
Queue<Pair<LoggedJob, JobTraceReader>> heap =
@@ -310,8 +277,7 @@ public class Folder extends Configured i
long currentIntervalEnd = Long.MIN_VALUE;
Path nextSegment = null;
- OutputStream tempUncompOut = null;
- JsonGenerator tempGen = null;
+ Outputter<LoggedJob> tempGen = null;
if (debug) {
LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
@@ -333,7 +299,9 @@ public class Folder extends Configured i
if (tempGen != null) {
tempGen.close();
}
- for (int i = 0; i < 3 && tempUncompOut == null; ++i) {
+
+ nextSegment = null;
+ for (int i = 0; i < 3 && nextSegment == null; ++i) {
try {
nextSegment =
new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
@@ -347,7 +315,7 @@ public class Folder extends Configured i
try {
if (!fs.exists(nextSegment)) {
- tempUncompOut = fs.create(nextSegment, false);
+ break;
}
continue;
@@ -360,6 +328,10 @@ public class Folder extends Configured i
}
}
+ if (nextSegment == null) {
+ throw new RuntimeException("Failed to create a new file!");
+ }
+
if (debug) {
LOG.debug("Creating " + nextSegment
+ " for a job with a submit time of " + job.getSubmitTime());
@@ -369,23 +341,8 @@ public class Folder extends Configured i
tempPaths.add(nextSegment);
- CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(nextSegment);
- OutputStream output;
- Compressor compressor = null;
- if (codec != null) {
- compressor = CodecPool.getCompressor(codec);
- output = codec.createOutputStream(tempUncompOut, compressor);
- } else {
- output = tempUncompOut;
- }
-
- tempUncompOut = null;
-
- tempGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
- if (debug) {
- tempGen.useDefaultPrettyPrinter();
- }
+ tempGen = new DefaultOutputter<LoggedJob>();
+ tempGen.init(nextSegment, conf);
long currentIntervalNumber =
(job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
@@ -396,7 +353,9 @@ public class Folder extends Configured i
// the temp files contain UDadjusted times, but each temp file's
// content is in the same input cycle interval.
- tempGen.writeObject(job);
+ if (tempGen != null) {
+ tempGen.output(job);
+ }
job = reader.nextJob();
}
@@ -541,11 +500,11 @@ public class Folder extends Configured i
private void maybeOutput(LoggedJob job) throws IOException {
for (int i = 0; i < transcriptionRateInteger; ++i) {
- outGen.writeObject(job);
+ outGen.output(job);
}
if (random.nextDouble() < transcriptionRateFraction) {
- outGen.writeObject(job);
+ outGen.output(job);
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Sat Dec 17 00:31:46 2011
@@ -56,12 +56,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
/**
* This is the main class for rumen log mining functionality.
@@ -126,7 +121,7 @@ public class HadoopLogsAnalyzer extends
*/
private boolean omitTaskDetails = false;
- private JsonGenerator jobTraceGen = null;
+ private Outputter<LoggedJob> jobTraceGen = null;
private boolean prettyprintTrace = true;
@@ -148,7 +143,7 @@ public class HadoopLogsAnalyzer extends
private int[] attemptTimesPercentiles;
- private JsonGenerator topologyGen = null;
+ private Outputter<LoggedNetworkTopology> topologyGen = null;
private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>();
@@ -502,28 +497,12 @@ public class HadoopLogsAnalyzer extends
}
if (jobTraceFilename != null) {
- ObjectMapper jmapper = new ObjectMapper();
- jmapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory jfactory = jmapper.getJsonFactory();
- FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
- jobTraceGen =
- jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
- JsonEncoding.UTF8);
- if (prettyprintTrace) {
- jobTraceGen.useDefaultPrettyPrinter();
- }
+ jobTraceGen = new DefaultOutputter<LoggedJob>();
+ jobTraceGen.init(jobTraceFilename, getConf());
if (topologyFilename != null) {
- ObjectMapper tmapper = new ObjectMapper();
- tmapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory tfactory = tmapper.getJsonFactory();
- FileSystem topoFS = topologyFilename.getFileSystem(getConf());
- topologyGen =
- tfactory.createJsonGenerator(topoFS.create(topologyFilename),
- JsonEncoding.UTF8);
- topologyGen.useDefaultPrettyPrinter();
+ topologyGen = new DefaultOutputter<LoggedNetworkTopology>();
+ topologyGen.init(topologyFilename, getConf());
}
}
@@ -795,8 +774,8 @@ public class HadoopLogsAnalyzer extends
*/
if (jobID != null
&& jobTraceGen != null
- && (jobBeingTraced == null || !jobID.equals(jobBeingTraced
- .getJobID()))) {
+ && (jobBeingTraced == null
+ || !jobID.equals(jobBeingTraced.getJobID().toString()))) {
// push out the old job if there is one, even though it did't get
// mated
// with a conf.
@@ -1615,7 +1594,7 @@ public class HadoopLogsAnalyzer extends
private void maybeMateJobAndConf() throws IOException {
if (jobBeingTraced != null && jobconf != null
- && jobBeingTraced.getJobID().equals(jobconf.jobID)) {
+ && jobBeingTraced.getJobID().toString().equals(jobconf.jobID)) {
jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes);
jobBeingTraced.setQueue(jobconf.queue);
@@ -1692,9 +1671,7 @@ public class HadoopLogsAnalyzer extends
jobBeingTraced.setMapperTriesToSucceed(null);
}
- jobTraceGen.writeObject(jobBeingTraced);
-
- jobTraceGen.writeRaw("\n");
+ jobTraceGen.output(jobBeingTraced);
jobBeingTraced = null;
}
@@ -1792,7 +1769,7 @@ public class HadoopLogsAnalyzer extends
if (topologyGen != null) {
LoggedNetworkTopology topo =
new LoggedNetworkTopology(allHosts, "<root>", 0);
- topologyGen.writeObject(topo);
+ topologyGen.output(topo);
topologyGen.close();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Sat Dec 17 00:31:46 2011
@@ -27,6 +27,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
import org.apache.hadoop.util.StringUtils;
/**
@@ -83,11 +85,6 @@ public class JobBuilder {
* The number of splits a task can have, before we ignore them all.
*/
private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
- /**
- * The regular expression used to parse task attempt IDs in job tracker logs
- */
- private final static Pattern taskAttemptIDPattern =
- Pattern.compile(".*_([0-9]+)");
private int[] attemptTimesPercentiles = null;
@@ -262,7 +259,9 @@ public class JobBuilder {
finalized = true;
// set the conf
- result.setJobProperties(jobConfigurationParameters);
+ if (jobConfigurationParameters != null) {
+ result.setJobProperties(jobConfigurationParameters);
+ }
// initialize all the per-job statistics gathering places
Histogram[] successfulMapAttemptTimes =
@@ -314,20 +313,10 @@ public class JobBuilder {
}
}
- String attemptID = attempt.getAttemptID();
+ TaskAttemptID attemptID = attempt.getAttemptID();
if (attemptID != null) {
- Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
-
- if (matcher.matches()) {
- String attemptNumberString = matcher.group(1);
-
- if (attemptNumberString != null) {
- int attemptNumber = Integer.parseInt(attemptNumberString);
-
- successfulNthMapperAttempts.enter(attemptNumber);
- }
- }
+ successfulNthMapperAttempts.enter(attemptID.getId());
}
} else {
if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Sat Dec 17 00:31:46 2011
@@ -21,10 +21,16 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.mapreduce.ID;
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.serializers.DefaultRumenSerializer;
+import org.apache.hadoop.tools.rumen.serializers.ObjectStringSerializer;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.module.SimpleModule;
/**
* Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
@@ -37,6 +43,19 @@ public class JsonObjectMapperWriter<T> i
ObjectMapper mapper = new ObjectMapper();
mapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+
+ // define a module
+ SimpleModule module = new SimpleModule("Default Serializer",
+ new Version(0, 1, 1, "FINAL"));
+ // add various serializers to the module
+ // add default (all-pass) serializer for all rumen specific data types
+ module.addSerializer(DataType.class, new DefaultRumenSerializer());
+ // add a serializer to use object.toString() while serializing
+ module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
+
+ // register the module with the object-mapper
+ mapper.registerModule(module);
+
mapper.getJsonFactory();
writer = mapper.getJsonFactory().createJsonGenerator(
output, JsonEncoding.UTF8);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Sat Dec 17 00:31:46 2011
@@ -27,6 +27,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.rumen.datatypes.*;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -50,8 +52,8 @@ public class LoggedJob implements DeepCo
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- String jobID;
- String user;
+ JobID jobID;
+ UserName user;
long computonsPerMapInputByte = -1L;
long computonsPerMapOutputByte = -1L;
long computonsPerReduceInputByte = -1L;
@@ -80,9 +82,9 @@ public class LoggedJob implements DeepCo
LoggedDiscreteCDF successfulReduceAttemptCDF;
LoggedDiscreteCDF failedReduceAttemptCDF;
- String queue = null;
+ QueueName queue = null;
- String jobName = null;
+ JobName jobName = null;
int clusterMapMB = -1;
int clusterReduceMB = -1;
@@ -94,7 +96,7 @@ public class LoggedJob implements DeepCo
double[] mapperTriesToSucceed;
double failedMapperFraction; // !!!!!
- private Properties jobProperties = new Properties();
+ private JobProperties jobProperties = new JobProperties();
LoggedJob() {
@@ -110,13 +112,13 @@ public class LoggedJob implements DeepCo
* Set the configuration properties of the job.
*/
void setJobProperties(Properties conf) {
- this.jobProperties = conf;
+ this.jobProperties = new JobProperties(conf);
}
/**
* Get the configuration properties of the job.
*/
- public Properties getJobProperties() {
+ public JobProperties getJobProperties() {
return jobProperties;
}
@@ -138,7 +140,6 @@ public class LoggedJob implements DeepCo
}
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -149,20 +150,20 @@ public class LoggedJob implements DeepCo
}
}
- public String getUser() {
+ public UserName getUser() {
return user;
}
void setUser(String user) {
- this.user = user;
+ this.user = new UserName(user);
}
- public String getJobID() {
+ public JobID getJobID() {
return jobID;
}
void setJobID(String jobID) {
- this.jobID = jobID;
+ this.jobID = JobID.forName(jobID);
}
public JobPriority getPriority() {
@@ -359,20 +360,20 @@ public class LoggedJob implements DeepCo
this.relativeTime = relativeTime;
}
- public String getQueue() {
+ public QueueName getQueue() {
return queue;
}
void setQueue(String queue) {
- this.queue = queue;
+ this.queue = new QueueName(queue);
}
- public String getJobName() {
+ public JobName getJobName() {
return jobName;
}
void setJobName(String jobName) {
- this.jobName = jobName;
+ this.jobName = new JobName(jobName);
}
public int getClusterMapMB() {
@@ -555,33 +556,52 @@ public class LoggedJob implements DeepCo
}
}
- private void compareJobProperties(Properties prop1, Properties prop2,
+ private void compareJobProperties(JobProperties jprop1, JobProperties jprop2,
TreePath loc, String eltname)
throws DeepInequalityException {
- if (prop1 == null && prop2 == null) {
+ if (jprop1 == null && jprop2 == null) {
return;
}
- if (prop1 == null || prop2 == null) {
- throw new DeepInequalityException(eltname + " miscompared [null]",
+ if (jprop1 == null || jprop2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared",
new TreePath(loc, eltname));
}
+ Properties prop1 = jprop1.getValue();
+ Properties prop2 = jprop2.getValue();
+
if (prop1.size() != prop2.size()) {
throw new DeepInequalityException(eltname + " miscompared [size]",
new TreePath(loc, eltname));
}
for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
- Object v1 = entry.getValue();
- Object v2 = prop2.get(entry.getKey());
- if (v1 == null || v2 == null || !v1.equals(v2)) {
- throw new DeepInequalityException(
- eltname + " miscompared for value of key : "
- + entry.getKey().toString(),
- new TreePath(loc, eltname));
- }
+ String v1 = entry.getValue().toString();
+ String v2 = prop2.get(entry.getKey()).toString();
+ compare1(v1, v2, new TreePath(loc, eltname), "key:" + entry.getKey());
+ }
+ }
+
+ private void compare1(DataType<String> c1, DataType<String> c2, TreePath loc,
+ String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
}
+
+ if (c1 == null || c2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared",
+ new TreePath(loc, eltname));
+ }
+ TreePath dtPath = new TreePath(loc, eltname);
+
+ if (!c1.getClass().getName().equals(c2.getClass().getName())) {
+ throw new DeepInequalityException(eltname + " miscompared",
+ new TreePath(dtPath, "class"));
+ }
+
+ compare1(c1.getValue(), c2.getValue(), dtPath, "value");
}
public void deepCompare(DeepCompare comparand, TreePath loc)
@@ -592,7 +612,7 @@ public class LoggedJob implements DeepCo
LoggedJob other = (LoggedJob) comparand;
- compare1(jobID, other.jobID, loc, "jobID");
+ compare1(jobID.toString(), other.jobID.toString(), loc, "jobID");
compare1(user, other.user, loc, "user");
compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Sat Dec 17 00:31:46 2011
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -44,20 +45,20 @@ import org.codehaus.jackson.annotate.Jso
*
*/
public class LoggedLocation implements DeepCompare {
- static final Map<List<String>, List<String>> layersCache =
- new HashMap<List<String>, List<String>>();
+ static final Map<List<String>, List<NodeName>> layersCache =
+ new HashMap<List<String>, List<NodeName>>();
/**
* The full path from the root of the network to the host.
*
* NOTE that this assumes that the network topology is a tree.
*/
- List<String> layers = Collections.emptyList();
+ List<NodeName> layers = Collections.emptyList();
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- public List<String> getLayers() {
+ public List<NodeName> getLayers() {
return layers;
}
@@ -66,16 +67,17 @@ public class LoggedLocation implements D
this.layers = Collections.emptyList();
} else {
synchronized (layersCache) {
- List<String> found = layersCache.get(layers);
+ List<NodeName> found = layersCache.get(layers);
if (found == null) {
// make a copy with interned string.
- List<String> clone = new ArrayList<String>(layers.size());
- for (String s : layers) {
- clone.add(s.intern());
- }
+ List<NodeName> clone = new ArrayList<NodeName>(layers.size());
+ clone.add(new NodeName(layers.get(0).intern(), null));
+ clone.add(new NodeName(null, layers.get(1).intern()));
+
// making it read-only as we are sharing them.
- List<String> readonlyLayers = Collections.unmodifiableList(clone);
- layersCache.put(readonlyLayers, readonlyLayers);
+ List<NodeName> readonlyLayers = Collections.unmodifiableList(clone);
+ List<String> readonlyLayersKey = Collections.unmodifiableList(layers);
+ layersCache.put(readonlyLayersKey, readonlyLayers);
this.layers = readonlyLayers;
} else {
this.layers = found;
@@ -84,7 +86,6 @@ public class LoggedLocation implements D
}
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -96,17 +97,33 @@ public class LoggedLocation implements D
}
// I'll treat this as an atomic object type
- private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
- String eltname) throws DeepInequalityException {
+ private void compareStrings(List<NodeName> c1, List<NodeName> c2,
+ TreePath loc, String eltname)
+ throws DeepInequalityException {
if (c1 == null && c2 == null) {
return;
}
TreePath recursePath = new TreePath(loc, eltname);
- if (c1 == null || c2 == null || !c1.equals(c2)) {
+ if (c1 == null || c2 == null || (c1.size() != c2.size())) {
throw new DeepInequalityException(eltname + " miscompared", recursePath);
}
+
+ for (NodeName n1 : c1) {
+ boolean found = false;
+ for (NodeName n2 : c2) {
+ if (n1.getValue().equals(n2.getValue())) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new DeepInequalityException(eltname
+ + " miscompared [" + n1.getValue() +"]", recursePath);
+ }
+ }
}
public void deepCompare(DeepCompare comparand, TreePath loc)
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Sat Dec 17 00:31:46 2011
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.ArrayList;
import java.util.Comparator;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -40,7 +41,7 @@ import org.codehaus.jackson.annotate.Jso
*
*/
public class LoggedNetworkTopology implements DeepCompare {
- String name;
+ NodeName name;
List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
static private Set<String> alreadySeenAnySetterAttributes =
@@ -50,7 +51,6 @@ public class LoggedNetworkTopology imple
super();
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -70,7 +70,7 @@ public class LoggedNetworkTopology imple
*/
static class TopoSort implements Comparator<LoggedNetworkTopology> {
public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
- return t1.name.compareTo(t2.name);
+ return t1.name.getValue().compareTo(t2.name.getValue());
}
}
@@ -83,8 +83,11 @@ public class LoggedNetworkTopology imple
* the level number
*/
LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
-
- this.name = name;
+ if (name == null) {
+ this.name = NodeName.ROOT;
+ } else {
+ this.name = new NodeName(name);
+ }
this.children = null;
if (level < ParsedHost.numberOfDistances() - 1) {
@@ -120,15 +123,15 @@ public class LoggedNetworkTopology imple
}
LoggedNetworkTopology(Set<ParsedHost> hosts) {
- this(hosts, "<root>", 0);
+ this(hosts, null, 0);
}
- public String getName() {
+ public NodeName getName() {
return name;
}
void setName(String name) {
- this.name = name;
+ this.name = new NodeName(name);
}
public List<LoggedNetworkTopology> getChildren() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Sat Dec 17 00:31:46 2011
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.mapreduce.jobhistory.Events;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
@@ -44,7 +44,7 @@ public class LoggedTask implements DeepC
long inputRecords = -1L;
long outputBytes = -1L;
long outputRecords = -1L;
- String taskID;
+ TaskID taskID;
long startTime = -1L;
long finishTime = -1L;
Pre21JobHistoryConstants.Values taskType;
@@ -55,7 +55,6 @@ public class LoggedTask implements DeepC
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -111,12 +110,12 @@ public class LoggedTask implements DeepC
this.outputRecords = outputRecords;
}
- public String getTaskID() {
+ public TaskID getTaskID() {
return taskID;
}
void setTaskID(String taskID) {
- this.taskID = taskID;
+ this.taskID = TaskID.forName(taskID);
}
public long getStartTime() {
@@ -357,7 +356,7 @@ public class LoggedTask implements DeepC
compare1(outputBytes, other.outputBytes, loc, "outputBytes");
compare1(outputRecords, other.outputRecords, loc, "outputRecords");
- compare1(taskID, other.taskID, loc, "taskID");
+ compare1(taskID.toString(), other.taskID.toString(), loc, "taskID");
compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime");
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Sat Dec 17 00:31:46 2011
@@ -30,9 +30,11 @@ import org.codehaus.jackson.annotate.Jso
// the Jackson implementation of JSON doesn't handle a
// superclass-valued field.
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
/**
* A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
@@ -44,11 +46,11 @@ import org.apache.hadoop.mapreduce.jobhi
*/
public class LoggedTaskAttempt implements DeepCompare {
- String attemptID;
+ TaskAttemptID attemptID;
Pre21JobHistoryConstants.Values result;
long startTime = -1L;
long finishTime = -1L;
- String hostName;
+ NodeName hostName;
long hdfsBytesRead = -1L;
long hdfsBytesWritten = -1L;
@@ -188,7 +190,6 @@ public class LoggedTaskAttempt implement
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -213,7 +214,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < clockSplits.length; ++i) {
result.add(clockSplits[i]);
}
-
+
this.clockSplits = result;
}
@@ -231,7 +232,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < cpuUsages.length; ++i) {
result.add(cpuUsages[i]);
}
-
+
this.cpuUsages = result;
}
@@ -249,7 +250,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < vMemKbytes.length; ++i) {
result.add(vMemKbytes[i]);
}
-
+
this.vMemKbytes = result;
}
@@ -267,7 +268,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < physMemKbytes.length; ++i) {
result.add(physMemKbytes[i]);
}
-
+
this.physMemKbytes = result;
}
@@ -292,12 +293,12 @@ public class LoggedTaskAttempt implement
this.sortFinished = sortFinished;
}
- public String getAttemptID() {
+ public TaskAttemptID getAttemptID() {
return attemptID;
}
void setAttemptID(String attemptID) {
- this.attemptID = attemptID;
+ this.attemptID = TaskAttemptID.forName(attemptID);
}
public Pre21JobHistoryConstants.Values getResult() {
@@ -324,15 +325,17 @@ public class LoggedTaskAttempt implement
this.finishTime = finishTime;
}
- public String getHostName() {
+ public NodeName getHostName() {
return hostName;
}
+ // This is needed for JSON deserialization
void setHostName(String hostName) {
- this.hostName = hostName;
+ this.hostName = hostName == null ? null : new NodeName(hostName);
}
-
- // hostName is saved in the format rackName/NodeName
+
+ // In job-history, hostName is saved in the format rackName/NodeName
+ //TODO this is a hack! The '/' handling needs fixing.
void setHostName(String hostName, String rackName) {
if (hostName == null || hostName.length() == 0) {
throw new RuntimeException("Invalid entry! Missing hostname");
@@ -649,6 +652,20 @@ public class LoggedTaskAttempt implement
}
}
+ private void compare1(NodeName c1, NodeName c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ compare1(c1.getValue(), c2.getValue(), new TreePath(loc, eltname), "value");
+ }
+
private void compare1(long c1, long c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 != c2) {
@@ -709,7 +726,7 @@ public class LoggedTaskAttempt implement
LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
- compare1(attemptID, other.attemptID, loc, "attemptID");
+ compare1(attemptID.toString(), other.attemptID.toString(), loc, "attemptID");
compare1(result, other.result, loc, "result");
compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime");
@@ -745,4 +762,4 @@ public class LoggedTaskAttempt implement
compare1(vMemKbytes, other.vMemKbytes, loc, "vMemKbytes");
compare1(physMemKbytes, other.physMemKbytes, loc, "physMemKbytes");
}
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Sat Dec 17 00:31:46 2011
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
-class ParsedHost {
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
+
+public class ParsedHost {
private final String rackName;
private final String nodeName;
@@ -70,10 +72,10 @@ class ParsedHost {
}
public ParsedHost(LoggedLocation loc) {
- List<String> coordinates = loc.getLayers();
+ List<NodeName> coordinates = loc.getLayers();
- rackName = coordinates.get(0);
- nodeName = coordinates.get(1);
+ rackName = coordinates.get(0).getRackName();
+ nodeName = coordinates.get(1).getHostName();
}
LoggedLocation makeLoggedLocation() {
@@ -89,11 +91,11 @@ class ParsedHost {
return result;
}
- String getNodeName() {
+ public String getNodeName() {
return nodeName;
}
- String getRackName() {
+ public String getRackName() {
return rackName;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java Sat Dec 17 00:31:46 2011
@@ -124,15 +124,16 @@ public class ZombieCluster extends Abstr
int level = levelMapping.get(n);
Node current;
if (level == leafLevel) { // a machine node
- MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
+ MachineNode.Builder builder =
+ new MachineNode.Builder(n.getName().getValue(), level);
if (defaultNode != null) {
builder.cloneFrom(defaultNode);
}
current = builder.build();
} else {
current = (level == leafLevel - 1)
- ? new RackNode(n.getName(), level) :
- new Node(n.getName(), level);
+ ? new RackNode(n.getName().getValue(), level) :
+ new Node(n.getName().getValue(), level);
path[level] = current;
// Add all children to the front of the queue.
for (LoggedNetworkTopology child : n.getChildren()) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1215367&r1=1215366&r2=1215367&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Sat Dec 17 00:31:46 2011
@@ -28,12 +28,14 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
/**
@@ -128,7 +130,7 @@ public class ZombieJob implements JobSto
// file, are added first because the specialized values obtained from
// Rumen should override the job conf values.
//
- for (Map.Entry<Object, Object> entry : job.getJobProperties().entrySet()) {
+ for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
jobConf.set(entry.getKey().toString(), entry.getValue().toString());
}
@@ -161,12 +163,12 @@ public class ZombieJob implements JobSto
List<String> hostList = new ArrayList<String>();
if (locations != null) {
for (LoggedLocation location : locations) {
- List<String> layers = location.getLayers();
+ List<NodeName> layers = location.getLayers();
if (layers.size() == 0) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
continue;
}
- String host = layers.get(layers.size() - 1);
+ String host = layers.get(layers.size() - 1).getValue();
if (host == null) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
continue;
@@ -226,20 +228,20 @@ public class ZombieJob implements JobSto
@Override
public String getName() {
- String jobName = job.getJobName();
+ JobName jobName = job.getJobName();
if (jobName == null) {
return "(name unknown)";
} else {
- return jobName;
+ return jobName.getValue();
}
}
@Override
public JobID getJobID() {
- return JobID.forName(getLoggedJob().getJobID());
+ return getLoggedJob().getJobID();
}
- private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
+ private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
if (oldVal == -1) {
LOG.warn(name +" not defined for "+id);
return defaultVal;
@@ -269,8 +271,10 @@ public class ZombieJob implements JobSto
@Override
public String getQueueName() {
- String queue = job.getQueue();
- return (queue == null)? JobConf.DEFAULT_QUEUE_NAME : queue;
+ QueueName queue = job.getQueue();
+ return (queue == null || queue.getValue() == null)
+ ? JobConf.DEFAULT_QUEUE_NAME
+ : queue.getValue();
}
/**
@@ -357,13 +361,12 @@ public class ZombieJob implements JobSto
for (LoggedTask map : job.getMapTasks()) {
map = sanitizeLoggedTask(map);
if (map != null) {
- loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+ loggedTaskMap.put(maskTaskID(map.taskID), map);
for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
if (mapAttempt != null) {
- TaskAttemptID id = TaskAttemptID.forName(mapAttempt
- .getAttemptID());
+ TaskAttemptID id = mapAttempt.getAttemptID();
loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
}
}
@@ -372,13 +375,12 @@ public class ZombieJob implements JobSto
for (LoggedTask reduce : job.getReduceTasks()) {
reduce = sanitizeLoggedTask(reduce);
if (reduce != null) {
- loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+ loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);
for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
if (reduceAttempt != null) {
- TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
- .getAttemptID());
+ TaskAttemptID id = reduceAttempt.getAttemptID();
loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
}
}
@@ -391,8 +393,10 @@ public class ZombieJob implements JobSto
@Override
public String getUser() {
- String retval = job.getUser();
- return (retval==null)?"(unknown)":retval;
+ UserName retval = job.getUser();
+ return (retval == null || retval.getValue() == null)
+ ? "(unknown)"
+ : retval.getValue();
}
/**
@@ -511,7 +515,7 @@ public class ZombieJob implements JobSto
}
}
- private long sanitizeTaskRuntime(long time, String id) {
+ private long sanitizeTaskRuntime(long time, ID id) {
if (time < 0) {
LOG.warn("Negative running time for task "+id+": "+time);
return 100L; // set default to 100ms.
@@ -547,7 +551,7 @@ public class ZombieJob implements JobSto
private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
int distance = cluster.getMaximumDistance();
- String rackHostName = loggedAttempt.getHostName();
+ String rackHostName = loggedAttempt.getHostName().getValue();
if (rackHostName == null) {
return distance;
}
@@ -558,11 +562,11 @@ public class ZombieJob implements JobSto
List<LoggedLocation> locations = loggedTask.getPreferredLocations();
if (locations != null) {
for (LoggedLocation location : locations) {
- List<String> layers = location.getLayers();
+ List<NodeName> layers = location.getLayers();
if ((layers == null) || (layers.isEmpty())) {
continue;
}
- String dataNodeName = layers.get(layers.size()-1);
+ String dataNodeName = layers.get(layers.size()-1).getValue();
MachineNode dataNode = cluster.getMachineByName(dataNodeName);
if (dataNode != null) {
distance = Math.min(distance, cluster.distance(mn, dataNode));
@@ -690,8 +694,8 @@ public class ZombieJob implements JobSto
private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
int taskAttemptNumber) {
- return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
- taskType, taskNumber), taskAttemptNumber);
+ return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber),
+ taskAttemptNumber);
}
private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
@@ -704,7 +708,7 @@ public class ZombieJob implements JobSto
state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
runtime = makeUpMapRuntime(state, locality);
runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
- taskNumber, taskAttemptNumber).toString());
+ taskNumber, taskAttemptNumber));
TaskAttemptInfo tai
= new MapTaskAttemptInfo(state, taskInfo, runtime, null);
return tai;
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/webapps/job/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 00:31:46 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:1152502-1214939
+/hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:1152502-1215364
/hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112
/hadoop/core/trunk/src/webapps/job:776175-785643