You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC
svn commit: r1077515 [3/4] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/
test/tools/data/rumen/small-trace-test/
test/tools/data/rumen/small-trace-test/counters-fo...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Mar 4 04:22:59 2011
@@ -47,9 +47,6 @@ public class LoggedTask implements DeepC
List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
List<LoggedLocation> preferredLocations = Collections.emptyList();
- int numberMaps = -1;
- int numberReduces = -1;
-
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
@@ -68,6 +65,15 @@ public class LoggedTask implements DeepC
super();
}
+ void adjustTimes(long adjustment) {
+ startTime += adjustment;
+ finishTime += adjustment;
+
+ for (LoggedTaskAttempt attempt : attempts) {
+ attempt.adjustTimes(adjustment);
+ }
+ }
+
public long getInputBytes() {
return inputBytes;
}
@@ -148,22 +154,6 @@ public class LoggedTask implements DeepC
}
}
- public int getNumberMaps() {
- return numberMaps;
- }
-
- void setNumberMaps(int numberMaps) {
- this.numberMaps = numberMaps;
- }
-
- public int getNumberReduces() {
- return numberReduces;
- }
-
- void setNumberReduces(int numberReduces) {
- this.numberReduces = numberReduces;
- }
-
public Pre21JobHistoryConstants.Values getTaskStatus() {
return taskStatus;
}
@@ -180,6 +170,110 @@ public class LoggedTask implements DeepC
this.taskType = taskType;
}
+ private void incorporateMapCounters(JhCounters counters) {
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.inputBytes = val;
+ }
+ }, counters, "HDFS_BYTES_READ");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.outputBytes = val;
+ }
+ }, counters, "FILE_BYTES_WRITTEN");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.inputRecords = val;
+ }
+ }, counters, "MAP_INPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.outputRecords = val;
+ }
+ }, counters, "MAP_OUTPUT_RECORDS");
+ }
+
+ private void incorporateReduceCounters(JhCounters counters) {
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.inputBytes = val;
+ }
+ }, counters, "REDUCE_SHUFFLE_BYTES");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.outputBytes = val;
+ }
+ }, counters, "HDFS_BYTES_WRITTEN");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.inputRecords = val;
+ }
+ }, counters, "REDUCE_INPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ task.outputRecords = val;
+ }
+ }, counters, "REDUCE_OUTPUT_RECORDS");
+ }
+
+ // incorporate event counters
+ // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL
+ public void incorporateCounters(JhCounters counters) {
+ switch (taskType) {
+ case MAP:
+ incorporateMapCounters(counters);
+ return;
+ case REDUCE:
+ incorporateReduceCounters(counters);
+ return;
+ // NOT exhaustive
+ }
+ }
+
+ private static String canonicalizeCounterName(String nonCanonicalName) {
+ String result = nonCanonicalName.toLowerCase();
+
+ result = result.replace(' ', '|');
+ result = result.replace('-', '|');
+ result = result.replace('_', '|');
+ result = result.replace('.', '|');
+
+ return result;
+ }
+
+ private abstract class SetField {
+ LoggedTask task;
+
+ SetField(LoggedTask task) {
+ this.task = task;
+ }
+
+ abstract void set(long value);
+ }
+
+ private static void incorporateCounter(SetField thunk, JhCounters counters,
+ String counterName) {
+ counterName = canonicalizeCounterName(counterName);
+
+ for (JhCounterGroup group : counters.groups) {
+ for (JhCounter counter : group.counts) {
+ if (counterName
+ .equals(canonicalizeCounterName(counter.name.toString()))) {
+ thunk.set(counter.value);
+ return;
+ }
+ }
+ }
+ }
+
private void compare1(long c1, long c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 != c2) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Mar 4 04:22:59 2011
@@ -82,6 +82,11 @@ public class LoggedTaskAttempt implement
}
}
+ void adjustTimes(long adjustment) {
+ startTime += adjustment;
+ finishTime += adjustment;
+ }
+
public long getShuffleFinished() {
return shuffleFinished;
}
@@ -135,7 +140,7 @@ public class LoggedTaskAttempt implement
}
void setHostName(String hostName) {
- this.hostName = (hostName == null) ? null : hostName.intern();
+ this.hostName = hostName == null ? null : hostName.intern();
}
public long getHdfsBytesRead() {
@@ -258,6 +263,130 @@ public class LoggedTaskAttempt implement
this.mapInputBytes = mapInputBytes;
}
+ // incorporate event counters
+ public void incorporateCounters(JhCounters counters) {
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.hdfsBytesRead = val;
+ }
+ }, counters, "HDFS_BYTES_READ");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.hdfsBytesWritten = val;
+ }
+ }, counters, "HDFS_BYTES_WRITTEN");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.fileBytesRead = val;
+ }
+ }, counters, "FILE_BYTES_READ");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.fileBytesWritten = val;
+ }
+ }, counters, "FILE_BYTES_WRITTEN");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.mapInputBytes = val;
+ }
+ }, counters, "MAP_INPUT_BYTES");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.mapInputRecords = val;
+ }
+ }, counters, "MAP_INPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.mapOutputBytes = val;
+ }
+ }, counters, "MAP_OUTPUT_BYTES");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.mapOutputRecords = val;
+ }
+ }, counters, "MAP_OUTPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.combineInputRecords = val;
+ }
+ }, counters, "COMBINE_INPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.reduceInputGroups = val;
+ }
+ }, counters, "REDUCE_INPUT_GROUPS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.reduceInputRecords = val;
+ }
+ }, counters, "REDUCE_INPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.reduceShuffleBytes = val;
+ }
+ }, counters, "REDUCE_SHUFFLE_BYTES");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.reduceOutputRecords = val;
+ }
+ }, counters, "REDUCE_OUTPUT_RECORDS");
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ attempt.spilledRecords = val;
+ }
+ }, counters, "SPILLED_RECORDS");
+ }
+
+ private static String canonicalizeCounterName(String nonCanonicalName) {
+ String result = nonCanonicalName.toLowerCase();
+
+ result = result.replace(' ', '|');
+ result = result.replace('-', '|');
+ result = result.replace('_', '|');
+ result = result.replace('.', '|');
+
+ return result;
+ }
+
+ private abstract class SetField {
+ LoggedTaskAttempt attempt;
+
+ SetField(LoggedTaskAttempt attempt) {
+ this.attempt = attempt;
+ }
+
+ abstract void set(long value);
+ }
+
+ private static void incorporateCounter(SetField thunk, JhCounters counters,
+ String counterName) {
+ counterName = canonicalizeCounterName(counterName);
+
+ for (JhCounterGroup group : counters.groups) {
+ for (JhCounter counter : group.counts) {
+ if (counterName
+ .equals(canonicalizeCounterName(counter.name.toString()))) {
+ thunk.set(counter.value);
+ return;
+ }
+ }
+ }
+ }
+
private void compare1(String c1, String c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 == null && c2 == null) {
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class MapAttempt20LineHistoryEventEmitter extends
+ TaskAttempt20LineEventEmitter {
+
+ static List<SingleEventEmitter> nonFinals =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+ static {
+ nonFinals.addAll(taskEventNonFinalSEEs);
+
+ finals.add(new MapAttemptFinishedEventEmitter());
+ }
+
+ protected MapAttempt20LineHistoryEventEmitter() {
+ super();
+ }
+
+ static private class MapAttemptFinishedEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+ HistoryEventEmitter thatg) {
+ if (taskAttemptIDName == null) {
+ return null;
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+ String status = line.get("TASK_STATUS");
+
+ if (finishTime != null && status != null
+ && status.equalsIgnoreCase("success")) {
+ String hostName = line.get("HOSTNAME");
+ String counters = line.get("COUNTERS");
+ String state = line.get("STATE_STRING");
+
+ MapAttempt20LineHistoryEventEmitter that =
+ (MapAttempt20LineHistoryEventEmitter) thatg;
+
+ if (finishTime != null && "success".equalsIgnoreCase(status)) {
+ return new MapAttemptFinishedEvent(taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(finishTime), Long
+ .parseLong(finishTime), hostName, state,
+ maybeParseCounters(counters));
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ List<SingleEventEmitter> finalSEEs() {
+ return finals;
+ }
+
+ @Override
+ List<SingleEventEmitter> nonFinalSEEs() {
+ return nonFinals;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful completion of a map attempt
+ *
+ */
+public class MapAttemptFinishedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private TaskAttemptID attemptId;
+ private TaskType taskType;
+ private String taskStatus;
+ private long mapFinishTime;
+ private long finishTime;
+ private String hostname;
+ private String state;
+ private JhCounters counters;
+
+ /**
+ * Create an event for successful completion of map attempts
+ * @param id Task Attempt ID
+ * @param taskType Type of the task
+ * @param taskStatus Status of the task
+ * @param mapFinishTime Finish time of the map phase
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the map executed
+ * @param state State string for the attempt
+ * @param counters Counters for the attempt
+ */
+ public MapAttemptFinishedEvent(TaskAttemptID id,
+ TaskType taskType, String taskStatus,
+ long mapFinishTime, long finishTime,
+ String hostname, String state, Counters counters) {
+ this.taskId = id.getTaskID();
+ this.attemptId = id;
+ this.taskType = taskType;
+ this.taskStatus = taskStatus;
+ this.mapFinishTime = mapFinishTime;
+ this.finishTime = finishTime;
+ this.hostname = hostname;
+ this.state = state;
+ this.counters = new JhCounters(counters, "COUNTERS");
+ }
+
+ /** Get the task ID */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the attempt id */
+ public TaskAttemptID getAttemptId() {
+ return attemptId;
+ }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the task status */
+ public String getTaskStatus() { return taskStatus; }
+ /** Get the map phase finish time */
+ public long getMapFinishTime() { return mapFinishTime; }
+ /** Get the attempt finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the host name */
+ public String getHostname() { return hostname; }
+ /** Get the state string */
+ public String getState() { return state; }
+ /** Get the counters */
+ public JhCounters getCounters() { return counters; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_FINISHED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Interface to output a sequence of objects of type T.
+ */
+public interface Outputter<T> extends Closeable {
+ /**
+ * Initialize the {@link Outputter} to a specific path.
+ * @param path The {@link Path} to the output file.
+ * @param conf Configuration
+ * @throws IOException
+ */
+ public void init(Path path, Configuration conf) throws IOException;
+
+ /**
+ * Output an object.
+ * @param object The objecte.
+ * @throws IOException
+ */
+ public void output(T object) throws IOException;
+
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Mar 4 04:22:59 2011
@@ -37,8 +37,10 @@ import org.w3c.dom.Text;
import org.xml.sax.SAXException;
class ParsedConfigFile {
- private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
- private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+ private static final Pattern jobIDPattern =
+ Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+ private static final Pattern heapPattern =
+ Pattern.compile("-Xmx([0-9]+)([mMgG])");
final int heapMegabytes;
@@ -68,6 +70,7 @@ class ParsedConfigFile {
}
@SuppressWarnings("hiding")
+ @Deprecated
ParsedConfigFile(String filenameLine, String xmlString) {
super();
@@ -136,10 +139,11 @@ class ParsedConfigFile {
value = ((Text) field.getFirstChild()).getData();
}
if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
- finalParameter = "true".equals(((Text) field.getFirstChild())
- .getData());
+ finalParameter =
+ "true".equals(((Text) field.getFirstChild()).getData());
}
}
+
if ("mapred.child.java.opts".equals(attr) && value != null) {
Matcher matcher = heapPattern.matcher(value);
if (matcher.find()) {
@@ -153,7 +157,7 @@ class ParsedConfigFile {
}
}
- if ("mapred.job.queue.name".equals(attr) && value != null) {
+ if ("mapred.queue.name".equals(attr) && value != null) {
queue = value;
}
@@ -161,14 +165,15 @@ class ParsedConfigFile {
jobName = value;
}
- clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
- value, clusterMapMB);
- clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
- attr, value, clusterReduceMB);
- jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
- jobMapMB);
- jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
- value, jobReduceMB);
+ clusterMapMB =
+ maybeGetIntValue("mapreduce.cluster.mapmemory.mb", attr, value, clusterMapMB);
+ clusterReduceMB =
+ maybeGetIntValue("mapred.cluster.reduce.memory.mb", attr, value,
+ clusterReduceMB);
+ jobMapMB =
+ maybeGetIntValue("mapred.job.map.memory.mb", attr, value, jobMapMB);
+ jobReduceMB =
+ maybeGetIntValue("mapred.job.reduce.memory.mb", attr, value, jobReduceMB);
}
valid = true;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Mar 4 04:22:59 2011
@@ -24,10 +24,10 @@ class ParsedLine {
Properties content;
LogRecordType type;
- static final Pattern keyValPair = Pattern
- .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+ static final Pattern keyValPair =
+ Pattern.compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
- @SuppressWarnings("unused")
+ @SuppressWarnings("unused")
ParsedLine(String fullLine, int version) {
super();
@@ -47,8 +47,16 @@ class ParsedLine {
String propValPairs = fullLine.substring(firstSpace + 1);
- while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
- propValPairs = propValPairs.substring(1);
+ int pvPairsFirstNonBlank = 0;
+ int pvPairsLength = propValPairs.length();
+
+ while (pvPairsLength > pvPairsFirstNonBlank
+ && propValPairs.charAt(pvPairsFirstNonBlank) == ' ') {
+ ++pvPairsFirstNonBlank;
+ }
+
+ if (pvPairsFirstNonBlank != 0) {
+ propValPairs = propValPairs.substring(pvPairsFirstNonBlank);
}
int cursor = 0;
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+
+class PossiblyDecompressedInputStream extends InputStream {
+ private final Decompressor decompressor;
+ private final InputStream coreInputStream;
+
+ public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
+ throws IOException {
+ CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
+ CompressionCodec inputCodec = codecs.getCodec(inputPath);
+
+ FileSystem ifs = inputPath.getFileSystem(conf);
+ FSDataInputStream fileIn = ifs.open(inputPath);
+
+ if (inputCodec == null) {
+ decompressor = null;
+ coreInputStream = fileIn;
+ } else {
+ decompressor = CodecPool.getDecompressor(inputCodec);
+ coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ return coreInputStream.read();
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ return coreInputStream.read(buffer, offset, length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+
+ coreInputStream.close();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The purpose of this class is to generate new random seeds from a master
+ * seed. This is needed to make the Random().next*() calls in rumen and mumak
+ * deterministic so that mumak simulations become deterministically replayable.
+ *
+ * In these tools we need many independent streams of random numbers, some of
+ * which are created dynamically. We seed these streams with the sub-seeds
+ * returned by RandomSeedGenerator.
+ *
+ * For a slightly more complicated approach to generating multiple streams of
+ * random numbers with better theoretical guarantees, see
+ * P. L'Ecuyer, R. Simard, E. J. Chen, and W. D. Kelton,
+ * ``An Objected-Oriented Random-Number Package with Many Long Streams and
+ * Substreams'', Operations Research, 50, 6 (2002), 1073--1075
+ * http://www.iro.umontreal.ca/~lecuyer/papers.html
+ * http://www.iro.umontreal.ca/~lecuyer/myftp/streams00/
+ */
+public class RandomSeedGenerator {
+ private static Log LOG = LogFactory.getLog(RandomSeedGenerator.class);
+
+ /** MD5 algorithm instance, one for each thread. */
+ private static final ThreadLocal<MessageDigest> md5Holder =
+ new ThreadLocal<MessageDigest>() {
+ @Override protected MessageDigest initialValue() {
+ MessageDigest md5 = null;
+ try {
+ md5 = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException nsae) {
+ throw new RuntimeException("Can't create MD5 digests", nsae);
+ }
+ return md5;
+ }
+ };
+
+ /**
+ * Generates a new random seed.
+ *
+ * @param streamId a string identifying the stream of random numbers
+ * @param masterSeed higher level master random seed
+ * @return the random seed. Different (streamId, masterSeed) pairs result in
+ * (vastly) different random seeds.
+ */
+ public static long getSeed(String streamId, long masterSeed) {
+ MessageDigest md5 = md5Holder.get();
+ md5.reset();
+ //'/' : make sure that we don't get the same str from ('11',0) and ('1',10)
+ // We could have fed the bytes of masterSeed one by one to md5.update()
+ // instead
+ String str = streamId + '/' + masterSeed;
+ byte[] digest = md5.digest(str.getBytes());
+ // Create a long from the first 8 bytes of the digest
+ // This is fine as MD5 has the avalanche property.
+ // Paranoids could have XOR folded the other 8 bytes in too.
+ long seed = 0;
+ for (int i=0; i<8; i++) {
+ seed = (seed<<8) + ((int)digest[i]+128);
+ }
+ return seed;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class ReduceAttempt20LineHistoryEventEmitter extends
+ TaskAttempt20LineEventEmitter {
+
+ static List<SingleEventEmitter> nonFinals =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+ static {
+ nonFinals.addAll(taskEventNonFinalSEEs);
+
+ finals.add(new ReduceAttemptFinishedEventEmitter());
+ }
+
+ ReduceAttempt20LineHistoryEventEmitter() {
+ super();
+ }
+
+ static private class ReduceAttemptFinishedEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+ HistoryEventEmitter thatg) {
+ if (taskAttemptIDName == null) {
+ return null;
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+ String status = line.get("TASK_STATUS");
+
+ if (finishTime != null && status != null
+ && status.equalsIgnoreCase("success")) {
+ String hostName = line.get("HOSTNAME");
+ String counters = line.get("COUNTERS");
+ String state = line.get("STATE_STRING");
+ String shuffleFinish = line.get("SHUFFLE_FINISHED");
+ String sortFinish = line.get("SORT_FINISHED");
+
+ if (finishTime != null && shuffleFinish != null && sortFinish != null
+ && "success".equalsIgnoreCase(status)) {
+ ReduceAttempt20LineHistoryEventEmitter that =
+ (ReduceAttempt20LineHistoryEventEmitter) thatg;
+
+ return new ReduceAttemptFinishedEvent(taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(shuffleFinish),
+ Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName,
+ state, maybeParseCounters(counters));
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ List<SingleEventEmitter> finalSEEs() {
+ return finals;
+ }
+
+ @Override
+ List<SingleEventEmitter> nonFinalSEEs() {
+ return nonFinals;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful completion of a reduce attempt
+ *
+ */
+public class ReduceAttemptFinishedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private TaskAttemptID attemptId;
+ private TaskType taskType;
+ private String taskStatus;
+ private long shuffleFinishTime;
+ private long sortFinishTime;
+ private long finishTime;
+ private String hostname;
+ private String state;
+ private JhCounters counters;
+
+ /**
+ * Create an event to record completion of a reduce attempt
+ * @param id Attempt Id
+ * @param taskType Type of task
+ * @param taskStatus Status of the task
+ * @param shuffleFinishTime Finish time of the shuffle phase
+ * @param sortFinishTime Finish time of the sort phase
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the attempt executed
+ * @param state State of the attempt
+ * @param counters Counters for the attempt
+ */
+ public ReduceAttemptFinishedEvent(TaskAttemptID id,
+ TaskType taskType, String taskStatus,
+ long shuffleFinishTime, long sortFinishTime,
+ long finishTime,
+ String hostname, String state, Counters counters) {
+ this.taskId = id.getTaskID();
+ this.attemptId = id;
+ this.taskType = taskType;
+ this.taskStatus = taskStatus;
+ this.shuffleFinishTime = shuffleFinishTime;
+ this.sortFinishTime = sortFinishTime;
+ this.finishTime = finishTime;
+ this.hostname = hostname;
+ this.state = state;
+ this.counters = new JhCounters(counters, "COUNTERS");
+ }
+
+ /** Get the Task ID */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the attempt id */
+ public TaskAttemptID getAttemptId() {
+ return attemptId;
+ }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the task status */
+ public String getTaskStatus() { return taskStatus; }
+ /** Get the finish time of the sort phase */
+ public long getSortFinishTime() { return sortFinishTime; }
+ /** Get the finish time of the shuffle phase */
+ public long getShuffleFinishTime() { return shuffleFinishTime; }
+ /** Get the finish time of the attempt */
+ public long getFinishTime() { return finishTime; }
+ /** Get the name of the host where the attempt ran */
+ public String getHostname() { return hostname; }
+ /** Get the state string */
+ public String getState() { return state; }
+ /** Get the counters for the attempt */
+ public JhCounters getCounters() { return counters; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.REDUCE_ATTEMPT_FINISHED;
+ }
+
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A simple wrapper class to make any input stream "rewindable". It could be
+ * made more memory efficient by grow the internal buffer adaptively.
+ */
+public class RewindableInputStream extends InputStream {
+ private InputStream input;
+
+ /**
+ * Constructor.
+ *
+ * @param input
+ */
+ public RewindableInputStream(InputStream input) {
+ this(input, 1024 * 1024);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param input
+ * input stream.
+ * @param maxBytesToRemember
+ * Maximum number of bytes we need to remember at the beginning of
+ * the stream. If {@link #rewind()} is called after so many bytes are
+ * read from the stream, {@link #rewind()} would fail.
+ */
+ public RewindableInputStream(InputStream input, int maxBytesToRemember) {
+ this.input = new BufferedInputStream(input, maxBytesToRemember);
+ this.input.mark(maxBytesToRemember);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return input.read();
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ return input.read(buffer, offset, length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+ public InputStream rewind() throws IOException {
+ try {
+ input.reset();
+ return this;
+ } catch (IOException e) {
+ throw new IOException("Unable to rewind the stream", e);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+abstract class SingleEventEmitter {
+ abstract HistoryEvent maybeEmitEvent(ParsedLine line, String name,
+ HistoryEventEmitter that);
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class Task20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+ static List<SingleEventEmitter> nonFinals =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+ Long originalStartTime = null;
+ TaskType originalTaskType = null;
+
+ static {
+ nonFinals.add(new TaskStartedEventEmitter());
+ nonFinals.add(new TaskUpdatedEventEmitter());
+
+ finals.add(new TaskFinishedEventEmitter());
+ finals.add(new TaskFailedEventEmitter());
+ }
+
+ protected Task20LineHistoryEventEmitter() {
+ super();
+ }
+
+ static private class TaskStartedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+ HistoryEventEmitter thatg) {
+ if (taskIDName == null) {
+ return null;
+ }
+
+ TaskID taskID = TaskID.forName(taskIDName);
+
+ String taskType = line.get("TASK_TYPE");
+ String startTime = line.get("START_TIME");
+ String splits = line.get("SPLITS");
+
+ if (startTime != null && taskType != null) {
+ Task20LineHistoryEventEmitter that =
+ (Task20LineHistoryEventEmitter) thatg;
+
+ that.originalStartTime = Long.parseLong(startTime);
+ that.originalTaskType =
+ Version20LogInterfaceUtils.get20TaskType(taskType);
+
+ return new TaskStartedEvent(taskID, that.originalStartTime,
+ that.originalTaskType, splits);
+ }
+
+ return null;
+ }
+ }
+
+ static private class TaskUpdatedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+ HistoryEventEmitter thatg) {
+ if (taskIDName == null) {
+ return null;
+ }
+
+ TaskID taskID = TaskID.forName(taskIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+
+ if (finishTime != null) {
+ return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
+ }
+
+ return null;
+ }
+ }
+
+ static private class TaskFinishedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+ HistoryEventEmitter thatg) {
+ if (taskIDName == null) {
+ return null;
+ }
+
+ TaskID taskID = TaskID.forName(taskIDName);
+
+ String status = line.get("TASK_STATUS");
+ String finishTime = line.get("FINISH_TIME");
+
+ String error = line.get("ERROR");
+
+ String counters = line.get("COUNTERS");
+
+ if (finishTime != null && error == null
+ && (status != null && status.equalsIgnoreCase("success"))) {
+ Counters eventCounters = maybeParseCounters(counters);
+
+ Task20LineHistoryEventEmitter that =
+ (Task20LineHistoryEventEmitter) thatg;
+
+ if (that.originalTaskType == null) {
+ return null;
+ }
+
+ return new TaskFinishedEvent(taskID, Long.parseLong(finishTime),
+ that.originalTaskType, status, eventCounters);
+ }
+
+ return null;
+ }
+ }
+
+ static private class TaskFailedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+ HistoryEventEmitter thatg) {
+ if (taskIDName == null) {
+ return null;
+ }
+
+ TaskID taskID = TaskID.forName(taskIDName);
+
+ String status = line.get("TASK_STATUS");
+ String finishTime = line.get("FINISH_TIME");
+
+ String taskType = line.get("TASK_TYPE");
+
+ String error = line.get("ERROR");
+
+ if (finishTime != null
+ && (error != null || (status != null && !status
+ .equalsIgnoreCase("success")))) {
+ Task20LineHistoryEventEmitter that =
+ (Task20LineHistoryEventEmitter) thatg;
+
+ TaskType originalTaskType =
+ that.originalTaskType == null ? Version20LogInterfaceUtils
+ .get20TaskType(taskType) : that.originalTaskType;
+
+ return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
+ originalTaskType, error, status, null);
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ List<SingleEventEmitter> finalSEEs() {
+ return finals;
+ }
+
+ @Override
+ List<SingleEventEmitter> nonFinalSEEs() {
+ return nonFinals;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter {
+ static List<SingleEventEmitter> taskEventNonFinalSEEs =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> taskEventFinalSEEs =
+ new LinkedList<SingleEventEmitter>();
+
+ static private final int DEFAULT_HTTP_PORT = 80;
+
+ Long originalStartTime = null;
+ org.apache.hadoop.mapreduce.TaskType originalTaskType = null;
+
+ static {
+ taskEventNonFinalSEEs.add(new TaskAttemptStartedEventEmitter());
+ taskEventNonFinalSEEs.add(new TaskAttemptFinishedEventEmitter());
+ taskEventNonFinalSEEs
+ .add(new TaskAttemptUnsuccessfulCompletionEventEmitter());
+ }
+
+ protected TaskAttempt20LineEventEmitter() {
+ super();
+ }
+
+ static private class TaskAttemptStartedEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+ HistoryEventEmitter thatg) {
+ if (taskAttemptIDName == null) {
+ return null;
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+ String startTime = line.get("START_TIME");
+ String taskType = line.get("TASK_TYPE");
+ String trackerName = line.get("TRACKER_NAME");
+ String httpPort = line.get("HTTP_PORT");
+
+ if (startTime != null && taskType != null) {
+ TaskAttempt20LineEventEmitter that =
+ (TaskAttempt20LineEventEmitter) thatg;
+
+ that.originalStartTime = Long.parseLong(startTime);
+ that.originalTaskType =
+ Version20LogInterfaceUtils.get20TaskType(taskType);
+
+ int port =
+ httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
+ .parseInt(httpPort);
+
+ return new TaskAttemptStartedEvent(taskAttemptID,
+ that.originalTaskType, that.originalStartTime, trackerName, port);
+ }
+
+ return null;
+ }
+ }
+
+ static private class TaskAttemptFinishedEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+ HistoryEventEmitter thatg) {
+ if (taskAttemptIDName == null) {
+ return null;
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+ String status = line.get("TASK_STATUS");
+
+ if (finishTime != null && status != null
+ && status.equalsIgnoreCase("success")) {
+ String hostName = line.get("HOSTNAME");
+ String counters = line.get("COUNTERS");
+ String state = line.get("STATE_STRING");
+
+ TaskAttempt20LineEventEmitter that =
+ (TaskAttempt20LineEventEmitter) thatg;
+
+ return new TaskAttemptFinishedEvent(taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(finishTime),
+ hostName, state, maybeParseCounters(counters));
+ }
+
+ return null;
+ }
+ }
+
+ static private class TaskAttemptUnsuccessfulCompletionEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+ HistoryEventEmitter thatg) {
+ if (taskAttemptIDName == null) {
+ return null;
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+ String status = line.get("TASK_STATUS");
+
+ if (finishTime != null && status != null
+ && !status.equalsIgnoreCase("success")) {
+ String hostName = line.get("HOSTNAME");
+ String error = line.get("ERROR");
+
+ TaskAttempt20LineEventEmitter that =
+ (TaskAttempt20LineEventEmitter) thatg;
+
+ return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(finishTime),
+ hostName, error);
+ }
+
+ return null;
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful task completion
+ *
+ */
+public class TaskAttemptFinishedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private TaskAttemptID attemptId;
+ private TaskType taskType;
+ private String taskStatus;
+ private long finishTime;
+ private String hostname;
+ private String state;
+ private JhCounters counters;
+
+ /**
+ * Create an event to record successful finishes for setup and cleanup
+ * attempts
+ * @param id Attempt ID
+ * @param taskType Type of task
+ * @param taskStatus Status of task
+ * @param finishTime Finish time of attempt
+ * @param hostname Host where the attempt executed
+ * @param state State string
+ * @param counters Counters for the attempt
+ */
+ public TaskAttemptFinishedEvent(TaskAttemptID id,
+ TaskType taskType, String taskStatus,
+ long finishTime,
+ String hostname, String state, Counters counters) {
+ this.taskId = id.getTaskID();
+ this.attemptId = id;
+ this.taskType = taskType;
+ this.taskStatus = taskStatus;
+ this.finishTime = finishTime;
+ this.hostname = hostname;
+ this.state = state;
+ this.counters = new JhCounters(counters, "COUNTERS");
+ }
+
+ /** Get the task ID */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the task attempt id */
+ public TaskAttemptID getAttemptId() {
+ return attemptId;
+ }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the task status */
+ public String getTaskStatus() { return taskStatus; }
+ /** Get the attempt finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the host where the attempt executed */
+ public String getHostname() { return hostname.toString(); }
+ /** Get the state string */
+ public String getState() { return state.toString(); }
+ /** Get the counters for the attempt */
+ public JhCounters getCounters() { return counters; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_FINISHED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record start of a task attempt
+ *
+ */
+public class TaskAttemptStartedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private TaskAttemptID attemptId;
+ private long startTime;
+ private TaskType taskType;
+ private String trackerName;
+ private int httpPort;
+
+ /**
+ * Create an event to record the start of an attempt
+ * @param attemptId Id of the attempt
+ * @param taskType Type of task
+ * @param startTime Start time of the attempt
+ * @param trackerName Name of the Task Tracker where attempt is running
+ * @param httpPort The port number of the tracker
+ */
+ public TaskAttemptStartedEvent( TaskAttemptID attemptId,
+ TaskType taskType, long startTime, String trackerName,
+ int httpPort) {
+ this.taskId = attemptId.getTaskID();
+ this.attemptId = attemptId;
+ this.startTime = startTime;
+ this.taskType = taskType;
+ this.trackerName = trackerName;
+ this.httpPort = httpPort;
+ }
+
+ /** Get the task id */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the tracker name */
+ public String getTrackerName() { return trackerName; }
+ /** Get the start time */
+ public long getStartTime() { return startTime; }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the HTTP port */
+ public int getHttpPort() { return httpPort; }
+ /** Get the attempt id */
+ public TaskAttemptID getTaskAttemptId() {
+ return attemptId;
+ }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_STARTED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record unsuccessful (Killed/Failed) completion of task attempts
+ *
+ */
+public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
+ private TaskID taskId;
+ private TaskType taskType;
+ private TaskAttemptID attemptId;
+ private long finishTime;
+ private String hostname;
+ private String error;
+ private String status;
+
+ /**
+ * Create an event to record the unsuccessful completion of attempts
+ * @param id Attempt ID
+ * @param taskType Type of the task
+ * @param status Status of the attempt
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the attempt executed
+ * @param error Error string
+ */
+ public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
+ TaskType taskType,
+ String status, long finishTime,
+ String hostname, String error) {
+ this.taskId = id.getTaskID();
+ this.taskType = taskType;
+ this.attemptId = id;
+ this.finishTime = finishTime;
+ this.hostname = hostname;
+ this.error = error;
+ this.status = status;
+ }
+
+ /** Get the task id */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the attempt id */
+ public TaskAttemptID getTaskAttemptId() {
+ return attemptId;
+ }
+ /** Get the finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the name of the host where the attempt executed */
+ public String getHostname() { return hostname; }
+ /** Get the error string */
+ public String getError() { return error; }
+ /** Get the task status */
+ public String getTaskStatus() { return status; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_KILLED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record the failure of a task
+ *
+ */
+public class TaskFailedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private String error;
+ private long finishTime;
+ private TaskType taskType;
+ private TaskAttemptID failedDueToAttempt;
+ private String status;
+
+ /**
+ * Create an event to record task failure
+ * @param id Task ID
+ * @param finishTime Finish time of the task
+ * @param taskType Type of the task
+ * @param error Error String
+ * @param status Status
+ * @param failedDueToAttempt The attempt id due to which the task failed
+ */
+ public TaskFailedEvent(TaskID id, long finishTime,
+ TaskType taskType, String error, String status,
+ TaskAttemptID failedDueToAttempt) {
+ this.taskId = id;
+ this.error = error;
+ this.finishTime = finishTime;
+ this.taskType = taskType;
+ this.failedDueToAttempt = failedDueToAttempt;
+ this.status = status;
+ }
+
+ /** Get the task id */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the error string */
+ public String getError() { return error; }
+ /** Get the finish time of the attempt */
+ public long getFinishTime() { return finishTime; }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the attempt id due to which the task failed */
+ public TaskAttemptID getFailedAttemptID() {
+ return failedDueToAttempt;
+ }
+ /** Get the task status */
+ public String getTaskStatus() { return status; }
+ /** Get the event type */
+ public EventType getEventType() { return EventType.TASK_FAILED; }
+
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+
+/**
+ * Event to record the successful completion of a task
+ *
+ */
+public class TaskFinishedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private long finishTime;
+ private JhCounters counters;
+ private TaskType taskType;
+ private String status;
+
+ /**
+ * Create an event to record the successful completion of a task
+ * @param id Task ID
+ * @param finishTime Finish time of the task
+ * @param taskType Type of the task
+ * @param status Status string
+ * @param counters Counters for the task
+ */
+ public TaskFinishedEvent(TaskID id, long finishTime,
+ TaskType taskType,
+ String status, Counters counters) {
+ this.taskId = id;
+ this.finishTime = finishTime;
+ this.counters = new JhCounters(counters, "COUNTERS");
+ this.taskType = taskType;
+ this.status = status;
+ }
+
+ /** Get task id */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the task finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get task counters */
+ public JhCounters getCounters() { return counters; }
+ /** Get task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get task status */
+ public String getTaskStatus() { return status; }
+ /** Get event type */
+ public EventType getEventType() {
+ return EventType.TASK_FINISHED;
+ }
+
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record the start of a task
+ *
+ */
+public class TaskStartedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private String splitLocations;
+ private long startTime;
+ private TaskType taskType;
+
+ /**
+ * Create an event to record start of a task
+ * @param id Task Id
+ * @param startTime Start time of the task
+ * @param taskType Type of the task
+ * @param splitLocations Split locations, applicable for map tasks
+ */
+ public TaskStartedEvent(TaskID id, long startTime,
+ TaskType taskType, String splitLocations) {
+ this.taskId = id;
+ this.splitLocations = splitLocations;
+ this.startTime = startTime;
+ this.taskType = taskType;
+ }
+
+ /** Get the task id */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the split locations, applicable for map tasks */
+ public String getSplitLocations() { return splitLocations; }
+ /** Get the start time of the task */
+ public long getStartTime() { return startTime; }
+ /** Get the task type */
+ public TaskType getTaskType() {
+ return taskType;
+ }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.TASK_STARTED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Event to record updates to a task
+ *
+ */
+public class TaskUpdatedEvent implements HistoryEvent {
+ private TaskID taskId;
+ private long finishTime;
+
+ /**
+ * Create an event to record task updates
+ * @param id Id of the task
+ * @param finishTime Finish time of the task
+ */
+ public TaskUpdatedEvent(TaskID id, long finishTime) {
+ this.taskId = id;
+ this.finishTime = finishTime;
+ }
+
+ /** Get the task ID */
+ public TaskID getTaskId() { return taskId; }
+ /** Get the task finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.TASK_UPDATED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * Building the cluster topology.
+ */
+public class TopologyBuilder {
+ private Set<ParsedHost> allHosts = new HashSet<ParsedHost>();
+
+ /**
+ * Process one {@link HistoryEvent}
+ *
+ * @param event
+ * The {@link HistoryEvent} to be processed.
+ */
+ public void process(HistoryEvent event) {
+ if (event instanceof TaskAttemptFinishedEvent) {
+ processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+ } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+ processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+ } else if (event instanceof TaskStartedEvent) {
+ processTaskStartedEvent((TaskStartedEvent) event);
+ }
+
+ // I do NOT expect these if statements to be exhaustive.
+ }
+
+ /**
+ * Process a collection of JobConf {@link Properties}. We do not restrict it
+ * to be called once.
+ *
+ * @param conf
+ * The job conf properties to be added.
+ */
+ public void process(Properties conf) {
+ // no code
+ }
+
+ /**
+ * Request the builder to build the final object. Once called, the
+ * {@link TopologyBuilder} would accept no more events or job-conf properties.
+ *
+ * @return Parsed {@link LoggedNetworkTopology} object.
+ */
+ public LoggedNetworkTopology build() {
+ return new LoggedNetworkTopology(allHosts);
+ }
+
+ private void processTaskStartedEvent(TaskStartedEvent event) {
+ preferredLocationForSplits(event.getSplitLocations());
+ }
+
+ private void processTaskAttemptUnsuccessfulCompletionEvent(
+ TaskAttemptUnsuccessfulCompletionEvent event) {
+ recordParsedHost(event.getHostname());
+ }
+
+ private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+ recordParsedHost(event.getHostname());
+ }
+
+ private void recordParsedHost(String hostName) {
+ ParsedHost result = ParsedHost.parse(hostName);
+
+ if (result != null && !allHosts.contains(result)) {
+ allHosts.add(result);
+ }
+ }
+
+ private void preferredLocationForSplits(String splits) {
+ if (splits != null) {
+ StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+ while (tok.hasMoreTokens()) {
+ String nextSplit = tok.nextToken();
+
+ recordParsedHost(nextSplit);
+ }
+ }
+ }
+}