You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2011/12/16 15:21:00 UTC
svn commit: r1215141 [3/4] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./ ivy/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/docs/src/documentation/content/xdocs/
src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org...
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/Folder.java Fri Dec 16 14:20:58 2011
@@ -35,23 +35,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
public class Folder extends Configured implements Tool {
private long outputDuration = -1;
private long inputCycle = -1;
@@ -66,7 +55,7 @@ public class Folder extends Configured i
static final private Log LOG = LogFactory.getLog(Folder.class);
private DeskewedJobTraceReader reader = null;
- private JsonGenerator outGen = null;
+ private Outputter<LoggedJob> outGen = null;
private List<Path> tempPaths = new LinkedList<Path>();
@@ -171,25 +160,8 @@ public class Folder extends Configured i
skewBufferLength, !allowMissorting);
Path outPath = new Path(outputPathName);
- ObjectMapper outMapper = new ObjectMapper();
- outMapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory outFactory = outMapper.getJsonFactory();
- FileSystem outFS = outPath.getFileSystem(conf);
-
- CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(outPath);
- OutputStream output;
- Compressor compressor = null;
- if (codec != null) {
- compressor = CodecPool.getCompressor(codec);
- output = codec.createOutputStream(outFS.create(outPath), compressor);
- } else {
- output = outFS.create(outPath);
- }
-
- outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
- outGen.useDefaultPrettyPrinter();
+ outGen = new DefaultOutputter<LoggedJob>();
+ outGen.init(outPath, conf);
tempDir =
tempDirName == null ? outPath.getParent() : new Path(tempDirName);
@@ -258,11 +230,6 @@ public class Folder extends Configured i
}
}
- ObjectMapper outMapper = new ObjectMapper();
- outMapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory outFactory = outMapper.getJsonFactory();
-
// we initialize an empty heap so if we take an error before establishing
// a real one the finally code goes through
Queue<Pair<LoggedJob, JobTraceReader>> heap =
@@ -310,8 +277,7 @@ public class Folder extends Configured i
long currentIntervalEnd = Long.MIN_VALUE;
Path nextSegment = null;
- OutputStream tempUncompOut = null;
- JsonGenerator tempGen = null;
+ Outputter<LoggedJob> tempGen = null;
if (debug) {
LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
@@ -333,7 +299,9 @@ public class Folder extends Configured i
if (tempGen != null) {
tempGen.close();
}
- for (int i = 0; i < 3 && tempUncompOut == null; ++i) {
+
+ nextSegment = null;
+ for (int i = 0; i < 3 && nextSegment == null; ++i) {
try {
nextSegment =
new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
@@ -347,7 +315,7 @@ public class Folder extends Configured i
try {
if (!fs.exists(nextSegment)) {
- tempUncompOut = fs.create(nextSegment, false);
+ break;
}
continue;
@@ -360,6 +328,10 @@ public class Folder extends Configured i
}
}
+ if (nextSegment == null) {
+ throw new RuntimeException("Failed to create a new file!");
+ }
+
if (debug) {
LOG.debug("Creating " + nextSegment
+ " for a job with a submit time of " + job.getSubmitTime());
@@ -369,23 +341,8 @@ public class Folder extends Configured i
tempPaths.add(nextSegment);
- CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(nextSegment);
- OutputStream output;
- Compressor compressor = null;
- if (codec != null) {
- compressor = CodecPool.getCompressor(codec);
- output = codec.createOutputStream(tempUncompOut, compressor);
- } else {
- output = tempUncompOut;
- }
-
- tempUncompOut = null;
-
- tempGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
- if (debug) {
- tempGen.useDefaultPrettyPrinter();
- }
+ tempGen = new DefaultOutputter<LoggedJob>();
+ tempGen.init(nextSegment, conf);
long currentIntervalNumber =
(job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
@@ -396,7 +353,9 @@ public class Folder extends Configured i
// the temp files contain UDadjusted times, but each temp file's
// content is in the same input cycle interval.
- tempGen.writeObject(job);
+ if (tempGen != null) {
+ tempGen.output(job);
+ }
job = reader.nextJob();
}
@@ -541,11 +500,11 @@ public class Folder extends Configured i
private void maybeOutput(LoggedJob job) throws IOException {
for (int i = 0; i < transcriptionRateInteger; ++i) {
- outGen.writeObject(job);
+ outGen.output(job);
}
if (random.nextDouble() < transcriptionRateFraction) {
- outGen.writeObject(job);
+ outGen.output(job);
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Fri Dec 16 14:20:58 2011
@@ -56,12 +56,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
/**
* This is the main class for rumen log mining functionality.
@@ -126,7 +121,7 @@ public class HadoopLogsAnalyzer extends
*/
private boolean omitTaskDetails = false;
- private JsonGenerator jobTraceGen = null;
+ private Outputter<LoggedJob> jobTraceGen = null;
private boolean prettyprintTrace = true;
@@ -148,7 +143,7 @@ public class HadoopLogsAnalyzer extends
private int[] attemptTimesPercentiles;
- private JsonGenerator topologyGen = null;
+ private Outputter<LoggedNetworkTopology> topologyGen = null;
private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>();
@@ -502,28 +497,12 @@ public class HadoopLogsAnalyzer extends
}
if (jobTraceFilename != null) {
- ObjectMapper jmapper = new ObjectMapper();
- jmapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory jfactory = jmapper.getJsonFactory();
- FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
- jobTraceGen =
- jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
- JsonEncoding.UTF8);
- if (prettyprintTrace) {
- jobTraceGen.useDefaultPrettyPrinter();
- }
+ jobTraceGen = new DefaultOutputter<LoggedJob>();
+ jobTraceGen.init(jobTraceFilename, getConf());
if (topologyFilename != null) {
- ObjectMapper tmapper = new ObjectMapper();
- tmapper.configure(
- SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- JsonFactory tfactory = tmapper.getJsonFactory();
- FileSystem topoFS = topologyFilename.getFileSystem(getConf());
- topologyGen =
- tfactory.createJsonGenerator(topoFS.create(topologyFilename),
- JsonEncoding.UTF8);
- topologyGen.useDefaultPrettyPrinter();
+ topologyGen = new DefaultOutputter<LoggedNetworkTopology>();
+ topologyGen.init(topologyFilename, getConf());
}
}
@@ -795,8 +774,8 @@ public class HadoopLogsAnalyzer extends
*/
if (jobID != null
&& jobTraceGen != null
- && (jobBeingTraced == null || !jobID.equals(jobBeingTraced
- .getJobID()))) {
+ && (jobBeingTraced == null
+ || !jobID.equals(jobBeingTraced.getJobID().toString()))) {
// push out the old job if there is one, even though it did't get
// mated
// with a conf.
@@ -1615,7 +1594,7 @@ public class HadoopLogsAnalyzer extends
private void maybeMateJobAndConf() throws IOException {
if (jobBeingTraced != null && jobconf != null
- && jobBeingTraced.getJobID().equals(jobconf.jobID)) {
+ && jobBeingTraced.getJobID().toString().equals(jobconf.jobID)) {
jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes);
jobBeingTraced.setQueue(jobconf.queue);
@@ -1692,9 +1671,7 @@ public class HadoopLogsAnalyzer extends
jobBeingTraced.setMapperTriesToSucceed(null);
}
- jobTraceGen.writeObject(jobBeingTraced);
-
- jobTraceGen.writeRaw("\n");
+ jobTraceGen.output(jobBeingTraced);
jobBeingTraced = null;
}
@@ -1792,7 +1769,7 @@ public class HadoopLogsAnalyzer extends
if (topologyGen != null) {
LoggedNetworkTopology topo =
new LoggedNetworkTopology(allHosts, "<root>", 0);
- topologyGen.writeObject(topo);
+ topologyGen.output(topo);
topologyGen.close();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Dec 16 14:20:58 2011
@@ -27,6 +27,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
import org.apache.hadoop.util.StringUtils;
/**
@@ -83,11 +85,6 @@ public class JobBuilder {
* The number of splits a task can have, before we ignore them all.
*/
private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
- /**
- * The regular expression used to parse task attempt IDs in job tracker logs
- */
- private final static Pattern taskAttemptIDPattern =
- Pattern.compile(".*_([0-9]+)");
private int[] attemptTimesPercentiles = null;
@@ -262,7 +259,9 @@ public class JobBuilder {
finalized = true;
// set the conf
- result.setJobProperties(jobConfigurationParameters);
+ if (jobConfigurationParameters != null) {
+ result.setJobProperties(jobConfigurationParameters);
+ }
// initialize all the per-job statistics gathering places
Histogram[] successfulMapAttemptTimes =
@@ -314,20 +313,10 @@ public class JobBuilder {
}
}
- String attemptID = attempt.getAttemptID();
+ TaskAttemptID attemptID = attempt.getAttemptID();
if (attemptID != null) {
- Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
-
- if (matcher.matches()) {
- String attemptNumberString = matcher.group(1);
-
- if (attemptNumberString != null) {
- int attemptNumber = Integer.parseInt(attemptNumberString);
-
- successfulNthMapperAttempts.enter(attemptNumber);
- }
- }
+ successfulNthMapperAttempts.enter(attemptID.getId());
}
} else {
if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Fri Dec 16 14:20:58 2011
@@ -21,10 +21,16 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.mapreduce.ID;
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.serializers.DefaultRumenSerializer;
+import org.apache.hadoop.tools.rumen.serializers.ObjectStringSerializer;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.module.SimpleModule;
/**
* Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
@@ -37,6 +43,19 @@ public class JsonObjectMapperWriter<T> i
ObjectMapper mapper = new ObjectMapper();
mapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+
+ // define a module
+ SimpleModule module = new SimpleModule("Default Serializer",
+ new Version(0, 1, 1, "FINAL"));
+ // add various serializers to the module
+ // add default (all-pass) serializer for all rumen specific data types
+ module.addSerializer(DataType.class, new DefaultRumenSerializer());
+ // add a serializer to use object.toString() while serializing
+ module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
+
+ // register the module with the object-mapper
+ mapper.registerModule(module);
+
mapper.getJsonFactory();
writer = mapper.getJsonFactory().createJsonGenerator(
output, JsonEncoding.UTF8);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Dec 16 14:20:58 2011
@@ -27,6 +27,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.rumen.datatypes.*;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -50,8 +52,8 @@ public class LoggedJob implements DeepCo
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- String jobID;
- String user;
+ JobID jobID;
+ UserName user;
long computonsPerMapInputByte = -1L;
long computonsPerMapOutputByte = -1L;
long computonsPerReduceInputByte = -1L;
@@ -80,9 +82,9 @@ public class LoggedJob implements DeepCo
LoggedDiscreteCDF successfulReduceAttemptCDF;
LoggedDiscreteCDF failedReduceAttemptCDF;
- String queue = null;
+ QueueName queue = null;
- String jobName = null;
+ JobName jobName = null;
int clusterMapMB = -1;
int clusterReduceMB = -1;
@@ -94,7 +96,7 @@ public class LoggedJob implements DeepCo
double[] mapperTriesToSucceed;
double failedMapperFraction; // !!!!!
- private Properties jobProperties = new Properties();
+ private JobProperties jobProperties = new JobProperties();
LoggedJob() {
@@ -110,13 +112,13 @@ public class LoggedJob implements DeepCo
* Set the configuration properties of the job.
*/
void setJobProperties(Properties conf) {
- this.jobProperties = conf;
+ this.jobProperties = new JobProperties(conf);
}
/**
* Get the configuration properties of the job.
*/
- public Properties getJobProperties() {
+ public JobProperties getJobProperties() {
return jobProperties;
}
@@ -138,7 +140,6 @@ public class LoggedJob implements DeepCo
}
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -149,20 +150,20 @@ public class LoggedJob implements DeepCo
}
}
- public String getUser() {
+ public UserName getUser() {
return user;
}
void setUser(String user) {
- this.user = user;
+ this.user = new UserName(user);
}
- public String getJobID() {
+ public JobID getJobID() {
return jobID;
}
void setJobID(String jobID) {
- this.jobID = jobID;
+ this.jobID = JobID.forName(jobID);
}
public JobPriority getPriority() {
@@ -359,20 +360,20 @@ public class LoggedJob implements DeepCo
this.relativeTime = relativeTime;
}
- public String getQueue() {
+ public QueueName getQueue() {
return queue;
}
void setQueue(String queue) {
- this.queue = queue;
+ this.queue = new QueueName(queue);
}
- public String getJobName() {
+ public JobName getJobName() {
return jobName;
}
void setJobName(String jobName) {
- this.jobName = jobName;
+ this.jobName = new JobName(jobName);
}
public int getClusterMapMB() {
@@ -555,33 +556,52 @@ public class LoggedJob implements DeepCo
}
}
- private void compareJobProperties(Properties prop1, Properties prop2,
+ private void compareJobProperties(JobProperties jprop1, JobProperties jprop2,
TreePath loc, String eltname)
throws DeepInequalityException {
- if (prop1 == null && prop2 == null) {
+ if (jprop1 == null && jprop2 == null) {
return;
}
- if (prop1 == null || prop2 == null) {
- throw new DeepInequalityException(eltname + " miscompared [null]",
+ if (jprop1 == null || jprop2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared",
new TreePath(loc, eltname));
}
+ Properties prop1 = jprop1.getValue();
+ Properties prop2 = jprop2.getValue();
+
if (prop1.size() != prop2.size()) {
throw new DeepInequalityException(eltname + " miscompared [size]",
new TreePath(loc, eltname));
}
for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
- Object v1 = entry.getValue();
- Object v2 = prop2.get(entry.getKey());
- if (v1 == null || v2 == null || !v1.equals(v2)) {
- throw new DeepInequalityException(
- eltname + " miscompared for value of key : "
- + entry.getKey().toString(),
- new TreePath(loc, eltname));
- }
+ String v1 = entry.getValue().toString();
+ String v2 = prop2.get(entry.getKey()).toString();
+ compare1(v1, v2, new TreePath(loc, eltname), "key:" + entry.getKey());
+ }
+ }
+
+ private void compare1(DataType<String> c1, DataType<String> c2, TreePath loc,
+ String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
}
+
+ if (c1 == null || c2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared",
+ new TreePath(loc, eltname));
+ }
+ TreePath dtPath = new TreePath(loc, eltname);
+
+ if (!c1.getClass().getName().equals(c2.getClass().getName())) {
+ throw new DeepInequalityException(eltname + " miscompared",
+ new TreePath(dtPath, "class"));
+ }
+
+ compare1(c1.getValue(), c2.getValue(), dtPath, "value");
}
public void deepCompare(DeepCompare comparand, TreePath loc)
@@ -592,7 +612,7 @@ public class LoggedJob implements DeepCo
LoggedJob other = (LoggedJob) comparand;
- compare1(jobID, other.jobID, loc, "jobID");
+ compare1(jobID.toString(), other.jobID.toString(), loc, "jobID");
compare1(user, other.user, loc, "user");
compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Fri Dec 16 14:20:58 2011
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -44,20 +45,20 @@ import org.codehaus.jackson.annotate.Jso
*
*/
public class LoggedLocation implements DeepCompare {
- static final Map<List<String>, List<String>> layersCache =
- new HashMap<List<String>, List<String>>();
+ static final Map<List<String>, List<NodeName>> layersCache =
+ new HashMap<List<String>, List<NodeName>>();
/**
* The full path from the root of the network to the host.
*
* NOTE that this assumes that the network topology is a tree.
*/
- List<String> layers = Collections.emptyList();
+ List<NodeName> layers = Collections.emptyList();
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- public List<String> getLayers() {
+ public List<NodeName> getLayers() {
return layers;
}
@@ -66,16 +67,17 @@ public class LoggedLocation implements D
this.layers = Collections.emptyList();
} else {
synchronized (layersCache) {
- List<String> found = layersCache.get(layers);
+ List<NodeName> found = layersCache.get(layers);
if (found == null) {
// make a copy with interned string.
- List<String> clone = new ArrayList<String>(layers.size());
- for (String s : layers) {
- clone.add(s.intern());
- }
+ List<NodeName> clone = new ArrayList<NodeName>(layers.size());
+ clone.add(new NodeName(layers.get(0).intern(), null));
+ clone.add(new NodeName(null, layers.get(1).intern()));
+
// making it read-only as we are sharing them.
- List<String> readonlyLayers = Collections.unmodifiableList(clone);
- layersCache.put(readonlyLayers, readonlyLayers);
+ List<NodeName> readonlyLayers = Collections.unmodifiableList(clone);
+ List<String> readonlyLayersKey = Collections.unmodifiableList(layers);
+ layersCache.put(readonlyLayersKey, readonlyLayers);
this.layers = readonlyLayers;
} else {
this.layers = found;
@@ -84,7 +86,6 @@ public class LoggedLocation implements D
}
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -96,17 +97,33 @@ public class LoggedLocation implements D
}
// I'll treat this as an atomic object type
- private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
- String eltname) throws DeepInequalityException {
+ private void compareStrings(List<NodeName> c1, List<NodeName> c2,
+ TreePath loc, String eltname)
+ throws DeepInequalityException {
if (c1 == null && c2 == null) {
return;
}
TreePath recursePath = new TreePath(loc, eltname);
- if (c1 == null || c2 == null || !c1.equals(c2)) {
+ if (c1 == null || c2 == null || (c1.size() != c2.size())) {
throw new DeepInequalityException(eltname + " miscompared", recursePath);
}
+
+ for (NodeName n1 : c1) {
+ boolean found = false;
+ for (NodeName n2 : c2) {
+ if (n1.getValue().equals(n2.getValue())) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new DeepInequalityException(eltname
+ + " miscompared [" + n1.getValue() +"]", recursePath);
+ }
+ }
}
public void deepCompare(DeepCompare comparand, TreePath loc)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Fri Dec 16 14:20:58 2011
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.ArrayList;
import java.util.Comparator;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter;
/**
@@ -40,7 +41,7 @@ import org.codehaus.jackson.annotate.Jso
*
*/
public class LoggedNetworkTopology implements DeepCompare {
- String name;
+ NodeName name;
List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
static private Set<String> alreadySeenAnySetterAttributes =
@@ -50,7 +51,6 @@ public class LoggedNetworkTopology imple
super();
}
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -70,7 +70,7 @@ public class LoggedNetworkTopology imple
*/
static class TopoSort implements Comparator<LoggedNetworkTopology> {
public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
- return t1.name.compareTo(t2.name);
+ return t1.name.getValue().compareTo(t2.name.getValue());
}
}
@@ -83,8 +83,11 @@ public class LoggedNetworkTopology imple
* the level number
*/
LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
-
- this.name = name;
+ if (name == null) {
+ this.name = NodeName.ROOT;
+ } else {
+ this.name = new NodeName(name);
+ }
this.children = null;
if (level < ParsedHost.numberOfDistances() - 1) {
@@ -120,15 +123,15 @@ public class LoggedNetworkTopology imple
}
LoggedNetworkTopology(Set<ParsedHost> hosts) {
- this(hosts, "<root>", 0);
+ this(hosts, null, 0);
}
- public String getName() {
+ public NodeName getName() {
return name;
}
void setName(String name) {
- this.name = name;
+ this.name = new NodeName(name);
}
public List<LoggedNetworkTopology> getChildren() {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Dec 16 14:20:58 2011
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.mapreduce.jobhistory.Events;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
@@ -44,7 +44,7 @@ public class LoggedTask implements DeepC
long inputRecords = -1L;
long outputBytes = -1L;
long outputRecords = -1L;
- String taskID;
+ TaskID taskID;
long startTime = -1L;
long finishTime = -1L;
Pre21JobHistoryConstants.Values taskType;
@@ -55,7 +55,6 @@ public class LoggedTask implements DeepC
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -111,12 +110,12 @@ public class LoggedTask implements DeepC
this.outputRecords = outputRecords;
}
- public String getTaskID() {
+ public TaskID getTaskID() {
return taskID;
}
void setTaskID(String taskID) {
- this.taskID = taskID;
+ this.taskID = TaskID.forName(taskID);
}
public long getStartTime() {
@@ -357,7 +356,7 @@ public class LoggedTask implements DeepC
compare1(outputBytes, other.outputBytes, loc, "outputBytes");
compare1(outputRecords, other.outputRecords, loc, "outputRecords");
- compare1(taskID, other.taskID, loc, "taskID");
+ compare1(taskID.toString(), other.taskID.toString(), loc, "taskID");
compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime");
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Dec 16 14:20:58 2011
@@ -30,9 +30,11 @@ import org.codehaus.jackson.annotate.Jso
// the Jackson implementation of JSON doesn't handle a
// superclass-valued field.
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
/**
* A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
@@ -44,11 +46,11 @@ import org.apache.hadoop.mapreduce.jobhi
*/
public class LoggedTaskAttempt implements DeepCompare {
- String attemptID;
+ TaskAttemptID attemptID;
Pre21JobHistoryConstants.Values result;
long startTime = -1L;
long finishTime = -1L;
- String hostName;
+ NodeName hostName;
long hdfsBytesRead = -1L;
long hdfsBytesWritten = -1L;
@@ -188,7 +190,6 @@ public class LoggedTaskAttempt implement
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
- @SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) {
@@ -213,7 +214,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < clockSplits.length; ++i) {
result.add(clockSplits[i]);
}
-
+
this.clockSplits = result;
}
@@ -231,7 +232,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < cpuUsages.length; ++i) {
result.add(cpuUsages[i]);
}
-
+
this.cpuUsages = result;
}
@@ -249,7 +250,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < vMemKbytes.length; ++i) {
result.add(vMemKbytes[i]);
}
-
+
this.vMemKbytes = result;
}
@@ -267,7 +268,7 @@ public class LoggedTaskAttempt implement
for (int i = 0; i < physMemKbytes.length; ++i) {
result.add(physMemKbytes[i]);
}
-
+
this.physMemKbytes = result;
}
@@ -292,12 +293,12 @@ public class LoggedTaskAttempt implement
this.sortFinished = sortFinished;
}
- public String getAttemptID() {
+ public TaskAttemptID getAttemptID() {
return attemptID;
}
void setAttemptID(String attemptID) {
- this.attemptID = attemptID;
+ this.attemptID = TaskAttemptID.forName(attemptID);
}
public Pre21JobHistoryConstants.Values getResult() {
@@ -324,15 +325,17 @@ public class LoggedTaskAttempt implement
this.finishTime = finishTime;
}
- public String getHostName() {
+ public NodeName getHostName() {
return hostName;
}
+ // This is needed for JSON deserialization
void setHostName(String hostName) {
- this.hostName = hostName;
+ this.hostName = hostName == null ? null : new NodeName(hostName);
}
-
- // hostName is saved in the format rackName/NodeName
+
+ // In job-history, hostName is saved in the format rackName/NodeName
+ //TODO this is a hack! The '/' handling needs fixing.
void setHostName(String hostName, String rackName) {
if (hostName == null || hostName.length() == 0) {
throw new RuntimeException("Invalid entry! Missing hostname");
@@ -649,6 +652,20 @@ public class LoggedTaskAttempt implement
}
}
+ private void compare1(NodeName c1, NodeName c2, TreePath loc, String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null) {
+ throw new DeepInequalityException(eltname + " miscompared", new TreePath(
+ loc, eltname));
+ }
+
+ compare1(c1.getValue(), c2.getValue(), new TreePath(loc, eltname), "value");
+ }
+
private void compare1(long c1, long c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 != c2) {
@@ -709,7 +726,7 @@ public class LoggedTaskAttempt implement
LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
- compare1(attemptID, other.attemptID, loc, "attemptID");
+ compare1(attemptID.toString(), other.attemptID.toString(), loc, "attemptID");
compare1(result, other.result, loc, "result");
compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime");
@@ -745,4 +762,4 @@ public class LoggedTaskAttempt implement
compare1(vMemKbytes, other.vMemKbytes, loc, "vMemKbytes");
compare1(physMemKbytes, other.physMemKbytes, loc, "physMemKbytes");
}
-}
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Fri Dec 16 14:20:58 2011
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
-class ParsedHost {
+import org.apache.hadoop.tools.rumen.datatypes.NodeName;
+
+public class ParsedHost {
private final String rackName;
private final String nodeName;
@@ -70,10 +72,10 @@ class ParsedHost {
}
public ParsedHost(LoggedLocation loc) {
- List<String> coordinates = loc.getLayers();
+ List<NodeName> coordinates = loc.getLayers();
- rackName = coordinates.get(0);
- nodeName = coordinates.get(1);
+ rackName = coordinates.get(0).getRackName();
+ nodeName = coordinates.get(1).getHostName();
}
LoggedLocation makeLoggedLocation() {
@@ -89,11 +91,11 @@ class ParsedHost {
return result;
}
- String getNodeName() {
+ public String getNodeName() {
return nodeName;
}
- String getRackName() {
+ public String getRackName() {
return rackName;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java Fri Dec 16 14:20:58 2011
@@ -124,15 +124,16 @@ public class ZombieCluster extends Abstr
int level = levelMapping.get(n);
Node current;
if (level == leafLevel) { // a machine node
- MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
+ MachineNode.Builder builder =
+ new MachineNode.Builder(n.getName().getValue(), level);
if (defaultNode != null) {
builder.cloneFrom(defaultNode);
}
current = builder.build();
} else {
current = (level == leafLevel - 1)
- ? new RackNode(n.getName(), level) :
- new Node(n.getName(), level);
+ ? new RackNode(n.getName().getValue(), level) :
+ new Node(n.getName().getValue(), level);
path[level] = current;
// Add all children to the front of the queue.
for (LoggedNetworkTopology child : n.getChildren()) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1215141&r1=1215140&r2=1215141&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Dec 16 14:20:58 2011
@@ -28,12 +28,14 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
/**
@@ -128,7 +130,7 @@ public class ZombieJob implements JobSto
// file, are added first because the specialized values obtained from
// Rumen should override the job conf values.
//
- for (Map.Entry<Object, Object> entry : job.getJobProperties().entrySet()) {
+ for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
jobConf.set(entry.getKey().toString(), entry.getValue().toString());
}
@@ -161,12 +163,12 @@ public class ZombieJob implements JobSto
List<String> hostList = new ArrayList<String>();
if (locations != null) {
for (LoggedLocation location : locations) {
- List<String> layers = location.getLayers();
+ List<NodeName> layers = location.getLayers();
if (layers.size() == 0) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
continue;
}
- String host = layers.get(layers.size() - 1);
+ String host = layers.get(layers.size() - 1).getValue();
if (host == null) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
continue;
@@ -226,20 +228,20 @@ public class ZombieJob implements JobSto
@Override
public String getName() {
- String jobName = job.getJobName();
+ JobName jobName = job.getJobName();
if (jobName == null) {
return "(name unknown)";
} else {
- return jobName;
+ return jobName.getValue();
}
}
@Override
public JobID getJobID() {
- return JobID.forName(getLoggedJob().getJobID());
+ return getLoggedJob().getJobID();
}
- private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
+ private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
if (oldVal == -1) {
LOG.warn(name +" not defined for "+id);
return defaultVal;
@@ -269,8 +271,10 @@ public class ZombieJob implements JobSto
@Override
public String getQueueName() {
- String queue = job.getQueue();
- return (queue == null)? JobConf.DEFAULT_QUEUE_NAME : queue;
+ QueueName queue = job.getQueue();
+ return (queue == null || queue.getValue() == null)
+ ? JobConf.DEFAULT_QUEUE_NAME
+ : queue.getValue();
}
/**
@@ -357,13 +361,12 @@ public class ZombieJob implements JobSto
for (LoggedTask map : job.getMapTasks()) {
map = sanitizeLoggedTask(map);
if (map != null) {
- loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+ loggedTaskMap.put(maskTaskID(map.taskID), map);
for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
if (mapAttempt != null) {
- TaskAttemptID id = TaskAttemptID.forName(mapAttempt
- .getAttemptID());
+ TaskAttemptID id = mapAttempt.getAttemptID();
loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
}
}
@@ -372,13 +375,12 @@ public class ZombieJob implements JobSto
for (LoggedTask reduce : job.getReduceTasks()) {
reduce = sanitizeLoggedTask(reduce);
if (reduce != null) {
- loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+ loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);
for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
if (reduceAttempt != null) {
- TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
- .getAttemptID());
+ TaskAttemptID id = reduceAttempt.getAttemptID();
loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
}
}
@@ -391,8 +393,10 @@ public class ZombieJob implements JobSto
@Override
public String getUser() {
- String retval = job.getUser();
- return (retval==null)?"(unknown)":retval;
+ UserName retval = job.getUser();
+ return (retval == null || retval.getValue() == null)
+ ? "(unknown)"
+ : retval.getValue();
}
/**
@@ -511,7 +515,7 @@ public class ZombieJob implements JobSto
}
}
- private long sanitizeTaskRuntime(long time, String id) {
+ private long sanitizeTaskRuntime(long time, ID id) {
if (time < 0) {
LOG.warn("Negative running time for task "+id+": "+time);
return 100L; // set default to 100ms.
@@ -547,7 +551,7 @@ public class ZombieJob implements JobSto
private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
int distance = cluster.getMaximumDistance();
- String rackHostName = loggedAttempt.getHostName();
+ String rackHostName = loggedAttempt.getHostName().getValue();
if (rackHostName == null) {
return distance;
}
@@ -558,11 +562,11 @@ public class ZombieJob implements JobSto
List<LoggedLocation> locations = loggedTask.getPreferredLocations();
if (locations != null) {
for (LoggedLocation location : locations) {
- List<String> layers = location.getLayers();
+ List<NodeName> layers = location.getLayers();
if ((layers == null) || (layers.isEmpty())) {
continue;
}
- String dataNodeName = layers.get(layers.size()-1);
+ String dataNodeName = layers.get(layers.size()-1).getValue();
MachineNode dataNode = cluster.getMachineByName(dataNodeName);
if (dataNode != null) {
distance = Math.min(distance, cluster.distance(mn, dataNode));
@@ -690,8 +694,8 @@ public class ZombieJob implements JobSto
private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
int taskAttemptNumber) {
- return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
- taskType, taskNumber), taskAttemptNumber);
+ return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber),
+ taskAttemptNumber);
}
private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
@@ -704,7 +708,7 @@ public class ZombieJob implements JobSto
state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
runtime = makeUpMapRuntime(state, locality);
runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
- taskNumber, taskAttemptNumber).toString());
+ taskNumber, taskAttemptNumber));
TaskAttemptInfo tai
= new MapTaskAttemptInfo(state, taskInfo, runtime, null);
return tai;
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/DataAnonymizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/DataAnonymizer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/DataAnonymizer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/DataAnonymizer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.anonymization;
+
+import org.apache.hadoop.tools.rumen.state.State;
+
+/**
+ * The data anonymizer interface.
+ */
+public interface DataAnonymizer<T> {
+ T anonymize(T data, State state);
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordList.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordList.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordList.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordList.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,106 @@
+package org.apache.hadoop.tools.rumen.anonymization;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.tools.rumen.state.State;
+
+/**
+ * Represents the list of words used in list-backed anonymizers.
+ */
+public class WordList implements State {
+ private Map<String, Integer> list = new HashMap<String, Integer>(0);
+ private boolean isUpdated = false;
+ private String name;
+
+ public WordList() {
+ this("word");
+ }
+
+ public WordList(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Adds the specified word to the list if the word is not already added.
+ */
+ public void add(String word) {
+ if (!contains(word)) {
+ int index = getSize();
+ list.put(word, index);
+ isUpdated = true;
+ }
+ }
+
+ /**
+ * Returns 'true' if the list contains the specified word.
+ */
+ public boolean contains(String word) {
+ return list.containsKey(word);
+ }
+
+ /**
+ * Returns the index of the specified word in the list.
+ */
+ public int indexOf(String word) {
+ return list.get(word);
+ }
+
+ /**
+ * Returns the size of the list.
+ */
+ public int getSize() {
+ return list.size();
+ }
+
+ /**
+ * Returns 'true' if the list is updated since creation (and reload).
+ */
+ @Override
+ public boolean isUpdated() {
+ return isUpdated;
+ }
+
+ /**
+ * Setters and getters for Jackson JSON
+ */
+ /**
+ * Sets the size of the list.
+ *
+ * Note: That this API is only for Jackson JSON deserialization.
+ */
+ public void setSize(int size) {
+ list = new HashMap<String, Integer>(size);
+ }
+
+ /**
+ * Note: That this API is only for Jackson JSON deserialization.
+ */
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets the words.
+ *
+ * Note: That this API is only for Jackson JSON serialization.
+ */
+ public Map<String, Integer> getWords() {
+ return list;
+ }
+
+ /**
+ * Sets the words.
+ *
+ * Note: That this API is only for Jackson JSON deserialization.
+ */
+ public void setWords(Map<String, Integer> list) {
+ this.list = list;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.anonymization;
+
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Utility class to handle commonly performed tasks in a
+ * {@link org.apache.hadoop.tools.rumen.datatypes.DefaultAnonymizableDataType}
+ * using a {@link WordList} for anonymization.
+ * //TODO There is no caching for saving memory.
+ */
+public class WordListAnonymizerUtility {
+ public static final String[] KNOWN_WORDS =
+ new String[] {"job", "tmp", "temp", "home", "homes", "usr", "user", "test"};
+
+ /**
+ * Checks if the data needs anonymization. Typically, data types which are
+ * numeric in nature doesn't need anonymization.
+ */
+ public static boolean needsAnonymization(String data) {
+ // Numeric data doesn't need anonymization
+ // Currently this doesnt support inputs like
+ // - 12.3
+ // - 12.3f
+ // - 90L
+ // - 1D
+ if (StringUtils.isNumeric(data)) {
+ return false;
+ }
+ return true; // by default return true
+ }
+
+ /**
+ * Checks if the given data has a known suffix.
+ */
+ public static boolean hasSuffix(String data, String[] suffixes) {
+ // check if they end in known suffixes
+ for (String ks : suffixes) {
+ if (data.endsWith(ks)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Extracts a known suffix from the given data.
+ *
+ * @throws RuntimeException if the data doesn't have a suffix.
+ * Use {@link #hasSuffix(String, String[])} to make sure that the
+ * given data has a suffix.
+ */
+ public static String[] extractSuffix(String data, String[] suffixes) {
+ // check if they end in known suffixes
+ String suffix = "";
+ for (String ks : suffixes) {
+ if (data.endsWith(ks)) {
+ suffix = ks;
+ // stripe off the suffix which will get appended later
+ data = data.substring(0, data.length() - suffix.length());
+ return new String[] {data, suffix};
+ }
+ }
+
+ // throw exception
+ throw new RuntimeException("Data [" + data + "] doesn't have a suffix from"
+ + " known suffixes [" + StringUtils.join(suffixes, ',') + "]");
+ }
+
+ /**
+ * Checks if the given data is known. This API uses {@link #KNOWN_WORDS} to
+ * detect if the given data is a commonly used (so called 'known') word.
+ */
+ public static boolean isKnownData(String data) {
+ return isKnownData(data, KNOWN_WORDS);
+ }
+
+ /**
+ * Checks if the given data is known.
+ */
+ public static boolean isKnownData(String data, String[] knownWords) {
+ // check if the data is known content
+ //TODO [Chunking] Do this for sub-strings of data
+
+ for (String kd : knownWords) {
+ if (data.equals(kd)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/AnonymizableDataType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/AnonymizableDataType.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/AnonymizableDataType.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/AnonymizableDataType.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+
+/**
+ * An interface for data-types that can be anonymized.
+ */
+public interface AnonymizableDataType<T> extends DataType<T> {
+ public T getAnonymizedValue(StatePool statePool, Configuration conf);
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/ClassName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/ClassName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/ClassName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/ClassName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Represents a class name.
+ */
+public class ClassName extends DefaultAnonymizableDataType {
+ public static final String CLASSNAME_PRESERVE_CONFIG = "rumen.data-types.classname.preserve";
+ private final String className;
+
+ public ClassName(String className) {
+ super();
+ this.className = className;
+ }
+
+ @Override
+ public String getValue() {
+ return className;
+ }
+
+ @Override
+ protected String getPrefix() {
+ return "class";
+ }
+
+ @Override
+ protected boolean needsAnonymization(Configuration conf) {
+ String[] preserves = conf.getStrings(CLASSNAME_PRESERVE_CONFIG);
+ if (preserves != null) {
+ // do a simple starts with check
+ for (String p : preserves) {
+ if (className.startsWith(p)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DataType.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DataType.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DataType.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+/**
+ * Represents a Rumen data-type.
+ */
+public interface DataType<T> {
+ T getValue();
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultAnonymizableDataType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultAnonymizableDataType.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultAnonymizableDataType.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultAnonymizableDataType.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.anonymization.WordList;
+import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+
+/**
+ * Represents a default anonymizable Rumen data-type. It uses
+ * {@link WordListAnonymizerUtility} for anonymization.
+ */
+public abstract class DefaultAnonymizableDataType
+implements AnonymizableDataType<String> {
+ private static final String DEFAULT_PREFIX = "data";
+
+ protected String getPrefix() {
+ return DEFAULT_PREFIX;
+ }
+
+ // Determines if the contained data needs anonymization
+ protected boolean needsAnonymization(Configuration conf) {
+ return true;
+ }
+
+ @Override
+ public final String getAnonymizedValue(StatePool statePool,
+ Configuration conf) {
+ if (needsAnonymization(conf)) {
+ WordList state = (WordList) statePool.getState(getClass());
+ if (state == null) {
+ state = new WordList(getPrefix());
+ statePool.addState(getClass(), state);
+ }
+ return anonymize(getValue(), state);
+ } else {
+ return getValue();
+ }
+ }
+
+ private static String anonymize(String data, WordList wordList) {
+ if (data == null) {
+ return null;
+ }
+
+ if (!wordList.contains(data)) {
+ wordList.add(data);
+ }
+ return wordList.getName() + wordList.indexOf(data);
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultDataType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultDataType.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultDataType.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/DefaultDataType.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+/**
+ * This represents the default java data-types (like int, long, float etc).
+ */
+public class DefaultDataType implements DataType<String> {
+ private String value;
+
+ public DefaultDataType(String value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the value of the attribute.
+ */
+ @Override
+ public String getValue() {
+ return value;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/FileName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/FileName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/FileName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/FileName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.anonymization.WordList;
+import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
+import org.apache.hadoop.tools.rumen.state.State;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Represents a file's location.
+ *
+ * Currently, only filenames that can be represented using {@link Path} are
+ * supported.
+ */
+public class FileName implements AnonymizableDataType<String> {
+ private final String fileName;
+ private String anonymizedFileName;
+ private static final String PREV_DIR = "..";
+ private static final String[] KNOWN_SUFFIXES =
+ new String[] {".xml", ".jar", ".txt", ".tar", ".zip", ".json", ".gzip",
+ ".lzo"};
+
+ /**
+ * A composite state for filename.
+ */
+ public static class FileNameState implements State {
+ private WordList dirState = new WordList("dir");
+ private WordList fileNameState = new WordList("file");
+
+ @Override
+ public boolean isUpdated() {
+ return dirState.isUpdated() || fileNameState.isUpdated();
+ }
+
+ public WordList getDirectoryState() {
+ return dirState;
+ }
+
+ public WordList getFileNameState() {
+ return fileNameState;
+ }
+
+ public void setDirectoryState(WordList state) {
+ this.dirState = state;
+ }
+
+ public void setFileNameState(WordList state) {
+ this.fileNameState = state;
+ }
+
+ @Override
+ public String getName() {
+ return "path";
+ }
+
+ @Override
+ public void setName(String name) {
+ // for now, simply assert since this class has a hardcoded name
+ if (!getName().equals(name)) {
+ throw new RuntimeException("State name mismatch! Expected '"
+ + getName() + "' but found '" + name + "'.");
+ }
+ }
+ }
+
+ public FileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ @Override
+ public String getValue() {
+ return fileName;
+ }
+
+ @Override
+ public String getAnonymizedValue(StatePool statePool,
+ Configuration conf) {
+ if (anonymizedFileName == null) {
+ anonymize(statePool, conf);
+ }
+ return anonymizedFileName;
+ }
+
+ private void anonymize(StatePool statePool, Configuration conf) {
+ FileNameState fState = (FileNameState) statePool.getState(getClass());
+ if (fState == null) {
+ fState = new FileNameState();
+ statePool.addState(getClass(), fState);
+ }
+
+ String[] files = StringUtils.split(fileName);
+ String[] anonymizedFileNames = new String[files.length];
+ int i = 0;
+ for (String f : files) {
+ anonymizedFileNames[i++] =
+ anonymize(statePool, conf, fState, f);
+ }
+
+ anonymizedFileName = StringUtils.arrayToString(anonymizedFileNames);
+ }
+
+ private static String anonymize(StatePool statePool, Configuration conf,
+ FileNameState fState, String fileName) {
+ String ret = null;
+ try {
+ URI uri = new URI(fileName);
+
+ // anonymize the path i.e without the authority & scheme
+ ret =
+ anonymizePath(uri.getPath(), fState.getDirectoryState(),
+ fState.getFileNameState());
+
+ // anonymize the authority and scheme
+ String authority = uri.getAuthority();
+ String scheme = uri.getScheme();
+ if (scheme != null) {
+ String anonymizedAuthority = "";
+ if (authority != null) {
+ // anonymize the authority
+ NodeName hostName = new NodeName(null, uri.getHost());
+ anonymizedAuthority = hostName.getAnonymizedValue(statePool, conf);
+ }
+ ret = scheme + "://" + anonymizedAuthority + ret;
+ }
+ } catch (URISyntaxException use) {
+ throw new RuntimeException (use);
+ }
+
+ return ret;
+ }
+
+ // Anonymize the file-path
+ private static String anonymizePath(String path, WordList dState,
+ WordList fState) {
+ StringBuilder buffer = new StringBuilder();
+ StringTokenizer tokenizer = new StringTokenizer(path, Path.SEPARATOR, true);
+ while (tokenizer.hasMoreTokens()) {
+ String token = tokenizer.nextToken();
+ if (Path.SEPARATOR.equals(token)) {
+ buffer.append(token);
+ } else if (Path.CUR_DIR.equals(token)) {
+ buffer.append(token);
+ } else if (PREV_DIR.equals(token)) {
+ buffer.append(token);
+ } else if (tokenizer.hasMoreTokens()){
+ // this component is a directory
+ buffer.append(anonymize(token, dState));
+ } else {
+ // this component is a file
+ buffer.append(anonymize(token, fState));
+ }
+ }
+
+ return buffer.toString();
+ }
+
+ //TODO There is no caching for saving memory.
+ private static String anonymize(String data, WordList wordList) {
+ if (data == null) {
+ return null;
+ }
+
+ if (WordListAnonymizerUtility.needsAnonymization(data)) {
+ String suffix = "";
+ String coreData = data;
+ // check and extract suffix
+ if (WordListAnonymizerUtility.hasSuffix(data, KNOWN_SUFFIXES)) {
+ // check if the data ends with a known suffix
+ String[] split =
+ WordListAnonymizerUtility.extractSuffix(data, KNOWN_SUFFIXES);
+ suffix = split[1];
+ coreData = split[0];
+ }
+
+ // check if the data is known content
+ //TODO [Chunking] Do this for sub-strings of data
+ String anonymizedData = coreData;
+ if (!WordListAnonymizerUtility.isKnownData(coreData)) {
+ if (!wordList.contains(coreData)) {
+ wordList.add(coreData);
+ }
+ anonymizedData = wordList.getName() + wordList.indexOf(coreData);
+ }
+
+ return anonymizedData + suffix;
+ } else {
+ return data;
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+
+/**
+ * Represents a job's name.
+ */
+public class JobName extends DefaultAnonymizableDataType {
+ private final String jobName;
+
+ public JobName(String jobName) {
+ super();
+ this.jobName = jobName;
+ }
+
+ @Override
+ public String getValue() {
+ return jobName;
+ }
+
+ @Override
+ protected String getPrefix() {
+ return "job";
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobProperties.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobProperties.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobProperties.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/JobProperties.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.datatypes.util.JobPropertyParser;
+import org.apache.hadoop.tools.rumen.datatypes.util.MapReduceJobPropertiesParser;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This represents the job configuration properties.
+ */
+public class JobProperties implements AnonymizableDataType<Properties> {
+ public static final String PARSERS_CONFIG_KEY =
+ "rumen.datatypes.jobproperties.parsers";
+ private final Properties jobProperties;
+
+ public JobProperties() {
+ this(new Properties());
+ }
+
+ public JobProperties(Properties properties) {
+ this.jobProperties = properties;
+ }
+
+ public Properties getValue() {
+ return jobProperties;
+ }
+
+ @Override
+ public Properties getAnonymizedValue(StatePool statePool,
+ Configuration conf) {
+ Properties filteredProperties = null;
+ List<JobPropertyParser> pList = new ArrayList<JobPropertyParser>(1);
+ // load the parsers
+ String config = conf.get(PARSERS_CONFIG_KEY);
+ if (config != null) {
+ @SuppressWarnings("unchecked")
+ Class<JobPropertyParser>[] parsers =
+ (Class[])conf.getClasses(PARSERS_CONFIG_KEY);
+ for (Class<JobPropertyParser> c : parsers) {
+ JobPropertyParser parser = ReflectionUtils.newInstance(c, conf);
+ pList.add(parser);
+ }
+ } else {
+ // add the default MapReduce filter
+ JobPropertyParser parser = new MapReduceJobPropertiesParser();
+ pList.add(parser);
+ }
+
+ // filter out the desired config key-value pairs
+ if (jobProperties != null) {
+ filteredProperties = new Properties();
+ // define a configuration object and load it with original job properties
+ for (Map.Entry<Object, Object> entry : jobProperties.entrySet()) {
+ //TODO Check for null key/value?
+ String key = entry.getKey().toString();
+ String value = entry.getValue().toString();
+
+ // find a parser for this key
+ for (JobPropertyParser p : pList) {
+ DataType<?> pValue = p.parseJobProperty(key, value);
+ if (pValue != null) {
+ filteredProperties.put(key, pValue);
+ break;
+ }
+ }
+ }
+ }
+ return filteredProperties;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/NodeName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/NodeName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/NodeName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/NodeName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.ParsedHost;
+import org.apache.hadoop.tools.rumen.anonymization.WordList;
+import org.apache.hadoop.tools.rumen.state.State;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * Represents the cluster host.
+ */
+public class NodeName implements AnonymizableDataType<String> {
+ private String hostName;
+ private String rackName;
+ private String nodeName;
+ private String anonymizedNodeName;
+
+ public static final NodeName ROOT = new NodeName("<root>");
+
+ /**
+ * A composite state for node-name.
+ */
+ public static class NodeNameState implements State {
+ private WordList rackNameState = new WordList("rack");
+ private WordList hostNameState = new WordList("host");
+
+ @Override
+ @JsonIgnore
+ public boolean isUpdated() {
+ return rackNameState.isUpdated() || hostNameState.isUpdated();
+ }
+
+ public WordList getRackNameState() {
+ return rackNameState;
+ }
+
+ public WordList getHostNameState() {
+ return hostNameState;
+ }
+
+ public void setRackNameState(WordList state) {
+ this.rackNameState = state;
+ }
+
+ public void setHostNameState(WordList state) {
+ this.hostNameState = state;
+ }
+
+ @Override
+ public String getName() {
+ return "node";
+ }
+
+ @Override
+ public void setName(String name) {
+ // for now, simply assert since this class has a hardcoded name
+ if (!getName().equals(name)) {
+ throw new RuntimeException("State name mismatch! Expected '"
+ + getName() + "' but found '" + name + "'.");
+ }
+ }
+ }
+
+ public NodeName(String nodeName) {
+ this.nodeName = nodeName;
+ ParsedHost pHost = ParsedHost.parse(nodeName);
+ if (pHost == null) {
+ this.rackName = null;
+ this.hostName = nodeName;
+ } else {
+ //TODO check for null and improve .. possibly call NodeName(r,h)
+ this.rackName = pHost.getRackName();
+ this.hostName = pHost.getNodeName();
+ }
+ }
+
+ public NodeName(String rName, String hName) {
+ rName = (rName == null)
+ ? rName
+ : rName.length() == 0
+ ? null
+ : rName;
+ hName = (hName == null)
+ ? hName
+ : hName.length() == 0
+ ? null
+ : hName;
+ if (hName == null) {
+ nodeName = rName;
+ rackName = rName;
+ } else if (rName == null) {
+ nodeName = hName;
+ ParsedHost pHost = ParsedHost.parse(nodeName);
+ if (pHost == null) {
+ this.rackName = null;
+ this.hostName = hName;
+ } else {
+ this.rackName = pHost.getRackName();
+ this.hostName = pHost.getNodeName();
+ }
+ } else {
+ rackName = rName;
+ this.hostName = hName;
+ this.nodeName = "/" + rName + "/" + hName;
+ }
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public String getRackName() {
+ return rackName;
+ }
+
+ @Override
+ public String getValue() {
+ return nodeName;
+ }
+
+ @Override
+ public String getAnonymizedValue(StatePool statePool, Configuration conf) {
+ if (this.getValue().equals(ROOT.getValue())) {
+ return getValue();
+ }
+ if (anonymizedNodeName == null) {
+ anonymize(statePool);
+ }
+ return anonymizedNodeName;
+ }
+
+ private void anonymize(StatePool pool) {
+ StringBuffer buf = new StringBuffer();
+ NodeNameState state = (NodeNameState) pool.getState(getClass());
+ if (state == null) {
+ state = new NodeNameState();
+ pool.addState(getClass(), state);
+ }
+
+ if (rackName != null && hostName != null) {
+ buf.append('/');
+ buf.append(anonymize(rackName, state.getRackNameState()));
+ buf.append('/');
+ buf.append(anonymize(hostName, state.getHostNameState()));
+ } else {
+ if (state.getRackNameState().contains(nodeName) || rackName != null) {
+ buf.append(anonymize(nodeName, state.getRackNameState()));
+ } else {
+ buf.append(anonymize(nodeName, state.getHostNameState()));
+ }
+ }
+
+ anonymizedNodeName = buf.toString();
+ }
+
+ //TODO There is no caching for saving memory.
+ private static String anonymize(String data, WordList wordList) {
+ if (data == null) {
+ return null;
+ }
+
+ if (!wordList.contains(data)) {
+ wordList.add(data);
+ }
+ return wordList.getName() + wordList.indexOf(data);
+ }
+}
\ No newline at end of file