You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC
svn commit: r1077515 [2/4] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/
test/tools/data/rumen/small-trace-test/
test/tools/data/rumen/small-trace-test/counters-fo...
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+public interface HistoryEvent {
+ /** Return this event's type. */
+ EventType getEventType();
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
+
+abstract class HistoryEventEmitter {
+ static final private Log LOG = LogFactory.getLog(HistoryEventEmitter.class);
+
+ abstract List<SingleEventEmitter> nonFinalSEEs();
+
+ abstract List<SingleEventEmitter> finalSEEs();
+
+ protected HistoryEventEmitter() {
+ // no code
+ }
+
+ enum PostEmitAction {
+ NONE, REMOVE_HEE
+ };
+
+ final Pair<Queue<HistoryEvent>, PostEmitAction> emitterCore(ParsedLine line,
+ String name) {
+ Queue<HistoryEvent> results = new LinkedList<HistoryEvent>();
+ PostEmitAction removeEmitter = PostEmitAction.NONE;
+ for (SingleEventEmitter see : nonFinalSEEs()) {
+ HistoryEvent event = see.maybeEmitEvent(line, name, this);
+ if (event != null) {
+ results.add(event);
+ }
+ }
+ for (SingleEventEmitter see : finalSEEs()) {
+ HistoryEvent event = see.maybeEmitEvent(line, name, this);
+ if (event != null) {
+ results.add(event);
+ removeEmitter = PostEmitAction.REMOVE_HEE;
+ break;
+ }
+ }
+ return new Pair<Queue<HistoryEvent>, PostEmitAction>(results, removeEmitter);
+ }
+
+ protected static Counters maybeParseCounters(String counters) {
+ try {
+ return parseCounters(counters);
+ } catch (ParseException e) {
+ LOG.warn("The counter string, \"" + counters + "\" is badly formatted.");
+ return null;
+ }
+ }
+
+ protected static Counters parseCounters(String counters)
+ throws ParseException {
+ if (counters == null) {
+ LOG.warn("HistoryEventEmitters: null counter detected:");
+ return null;
+ }
+
+ counters = counters.replace("\\.", "\\\\.");
+ counters = counters.replace("\\\\(", "\\(");
+ counters = counters.replace("\\\\)", "\\)");
+ counters = counters.replace("\\\\[", "\\[");
+ counters = counters.replace("\\\\]", "\\]");
+
+ org.apache.hadoop.mapred.Counters depForm =
+ org.apache.hadoop.mapred.Counters.fromEscapedCompactString(counters);
+
+ return new Counters(depForm);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link InputDemuxer} dem-ultiplexes the input files into individual input
+ * streams.
+ */
+public interface InputDemuxer extends Closeable {
+ /**
+ * Bind the {@link InputDemuxer} to a particular file.
+ *
+ * @param path
+ * The path to the find it should bind to.
+ * @param conf
+ * Configuration
+ * @throws IOException
+ *
+ * Returns true when the binding succeeds. If the file can be read
+ * but is in the wrong format, returns false. IOException is
+ * reserved for read errors.
+ */
+ public void bindTo(Path path, Configuration conf) throws IOException;
+
+ /**
+ * Get the next <name, input> pair. The name should preserve the original job
+ * history file or job conf file name. The input object should be closed
+ * before calling getNext() again. The old input object would be invalid after
+ * calling getNext() again.
+ *
+ * @return the next <name, input> pair.
+ */
+ public Pair<String, InputStream> getNext() throws IOException;
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class JhCounter {
+ public String name;
+ public String displayName;
+ public long value;
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.List;
+
+public class JhCounterGroup {
+ public String name;
+ public String displayName;
+ public List<JhCounter> counts;
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+public class JhCounters {
+ public String name;
+ public List<JhCounterGroup> groups;
+
+ JhCounters(Counters counters, String name) {
+ this.name = name;
+ this.groups = new ArrayList<JhCounterGroup>();
+ if (counters == null) return;
+ for (CounterGroup group : counters) {
+ JhCounterGroup g = new JhCounterGroup();
+ g.name = group.getName();
+ g.displayName = group.getDisplayName();
+ g.counts = new ArrayList<JhCounter>(group.size());
+ for (Counter counter : group) {
+ JhCounter c = new JhCounter();
+ c.name = counter.getName();
+ c.displayName = counter.getDisplayName();
+ c.value = counter.getValue();
+ g.counts.add(c);
+ }
+ this.groups.add(g);
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+ static List<SingleEventEmitter> nonFinals =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+ Long originalSubmitTime = null;
+
+ static {
+ nonFinals.add(new JobSubmittedEventEmitter());
+ nonFinals.add(new JobPriorityChangeEventEmitter());
+ nonFinals.add(new JobStatusChangedEventEmitter());
+ nonFinals.add(new JobInitedEventEmitter());
+ nonFinals.add(new JobInfoChangeEventEmitter());
+
+ finals.add(new JobUnsuccessfulCompletionEventEmitter());
+ finals.add(new JobFinishedEventEmitter());
+ }
+
+ Job20LineHistoryEventEmitter() {
+ super();
+ }
+
+ static private class JobSubmittedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ JobID jobID = JobID.forName(jobIDName);
+
+ if (jobIDName == null) {
+ return null;
+ }
+
+ String submitTime = line.get("SUBMIT_TIME");
+ String jobConf = line.get("JOBCONF");
+ String user = line.get("USER");
+ String jobName = line.get("JOBNAME");
+
+ if (submitTime != null) {
+ Job20LineHistoryEventEmitter that =
+ (Job20LineHistoryEventEmitter) thatg;
+
+ that.originalSubmitTime = Long.parseLong(submitTime);
+
+ Map<JobACL, AccessControlList> jobACLs =
+ new HashMap<JobACL, AccessControlList>();
+ return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
+ : user, that.originalSubmitTime, jobConf, jobACLs);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobPriorityChangeEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ JobID jobID = JobID.forName(jobIDName);
+
+ if (jobIDName == null) {
+ return null;
+ }
+
+ String priority = line.get("JOB_PRIORITY");
+
+ if (priority != null) {
+ return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobInitedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String launchTime = line.get("LAUNCH_TIME");
+ String status = line.get("JOB_STATUS");
+ String totalMaps = line.get("TOTAL_MAPS");
+ String totalReduces = line.get("TOTAL_REDUCES");
+
+ if (launchTime != null && totalMaps != null && totalReduces != null) {
+ return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
+ .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobStatusChangedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String status = line.get("JOB_STATUS");
+
+ if (status != null) {
+ return new JobStatusChangedEvent(jobID, status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobInfoChangeEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String launchTime = line.get("LAUNCH_TIME");
+
+ if (launchTime != null) {
+ Job20LineHistoryEventEmitter that =
+ (Job20LineHistoryEventEmitter) thatg;
+ return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
+ .parseLong(launchTime));
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobUnsuccessfulCompletionEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+
+ String status = line.get("JOB_STATUS");
+
+ String finishedMaps = line.get("FINISHED_MAPS");
+ String finishedReduces = line.get("FINISHED_REDUCES");
+
+ if (status != null && !status.equalsIgnoreCase("success")
+ && finishTime != null && finishedMaps != null
+ && finishedReduces != null) {
+ return new JobUnsuccessfulCompletionEvent(jobID, Long
+ .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
+ .parseInt(finishedReduces), status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobFinishedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+
+ String status = line.get("JOB_STATUS");
+
+ String finishedMaps = line.get("FINISHED_MAPS");
+ String finishedReduces = line.get("FINISHED_REDUCES");
+
+ String failedMaps = line.get("FAILED_MAPS");
+ String failedReduces = line.get("FAILED_REDUCES");
+
+ String counters = line.get("COUNTERS");
+
+ if (status != null && status.equalsIgnoreCase("success")
+ && finishTime != null && finishedMaps != null
+ && finishedReduces != null) {
+ return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
+ .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
+ .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
+ maybeParseCounters(counters));
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ List<SingleEventEmitter> finalSEEs() {
+ return finals;
+ }
+
+ @Override
+ List<SingleEventEmitter> nonFinalSEEs() {
+ return nonFinals;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link JobBuilder} builds one job. It processes a sequence of
+ * {@link HistoryEvent}s.
+ */
+public class JobBuilder {
+ private static final long BYTES_IN_MEG =
+ StringUtils.TraditionalBinaryPrefix.string2long("1m");
+
+ private String jobID;
+
+ private boolean finalized = false;
+
+ private LoggedJob result = new LoggedJob();
+
+ private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
+ private Map<String, LoggedTask> reduceTasks =
+ new HashMap<String, LoggedTask>();
+ private Map<String, LoggedTask> otherTasks =
+ new HashMap<String, LoggedTask>();
+
+ private Map<String, LoggedTaskAttempt> attempts =
+ new HashMap<String, LoggedTaskAttempt>();
+
+ private Map<ParsedHost, ParsedHost> allHosts =
+ new HashMap<ParsedHost, ParsedHost>();
+
+ /**
+ * The number of splits a task can have, before we ignore them all.
+ */
+ private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
+ /**
+ * The regular expression used to parse task attempt IDs in job tracker logs
+ */
+ private final static Pattern taskAttemptIDPattern =
+ Pattern.compile(".*_([0-9]+)");
+
+ private int[] attemptTimesPercentiles = null;
+
+ // Use this to search within the java options to get heap sizes.
+ // The heap size number is in Capturing Group 1.
+ // The heap size order-of-magnitude suffix is in Capturing Group 2
+ private static final Pattern heapPattern =
+ Pattern.compile("-Xmx([0-9]+[kKmMgGtT])");
+
+ public JobBuilder(String jobID) {
+ this.jobID = jobID;
+ }
+
+ public String getJobID() {
+ return jobID;
+ }
+
+ {
+ if (attemptTimesPercentiles == null) {
+ attemptTimesPercentiles = new int[19];
+
+ for (int i = 0; i < 19; ++i) {
+ attemptTimesPercentiles[i] = (i + 1) * 5;
+ }
+ }
+ }
+
+ /**
+ * Process one {@link HistoryEvent}
+ *
+ * @param event
+ * The {@link HistoryEvent} to be processed.
+ */
+ public void process(HistoryEvent event) {
+ if (finalized) {
+ throw new IllegalStateException(
+ "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
+ }
+
+ // these are in lexicographical order by class name.
+ if (event instanceof JobFinishedEvent) {
+ processJobFinishedEvent((JobFinishedEvent) event);
+ } else if (event instanceof JobInfoChangeEvent) {
+ processJobInfoChangeEvent((JobInfoChangeEvent) event);
+ } else if (event instanceof JobInitedEvent) {
+ processJobInitedEvent((JobInitedEvent) event);
+ } else if (event instanceof JobPriorityChangeEvent) {
+ processJobPriorityChangeEvent((JobPriorityChangeEvent) event);
+ } else if (event instanceof JobStatusChangedEvent) {
+ processJobStatusChangedEvent((JobStatusChangedEvent) event);
+ } else if (event instanceof JobSubmittedEvent) {
+ processJobSubmittedEvent((JobSubmittedEvent) event);
+ } else if (event instanceof JobUnsuccessfulCompletionEvent) {
+ processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event);
+ } else if (event instanceof MapAttemptFinishedEvent) {
+ processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
+ } else if (event instanceof ReduceAttemptFinishedEvent) {
+ processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
+ } else if (event instanceof TaskAttemptFinishedEvent) {
+ processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+ } else if (event instanceof TaskAttemptStartedEvent) {
+ processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event);
+ } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+ processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+ } else if (event instanceof TaskFailedEvent) {
+ processTaskFailedEvent((TaskFailedEvent) event);
+ } else if (event instanceof TaskFinishedEvent) {
+ processTaskFinishedEvent((TaskFinishedEvent) event);
+ } else if (event instanceof TaskStartedEvent) {
+ processTaskStartedEvent((TaskStartedEvent) event);
+ } else if (event instanceof TaskUpdatedEvent) {
+ processTaskUpdatedEvent((TaskUpdatedEvent) event);
+ } else
+ throw new IllegalArgumentException(
+ "JobBuilder.process(HistoryEvent): unknown event type");
+ }
+
+ private String extract(Properties conf, String[] names, String defaultValue) {
+ for (String name : names) {
+ String result = conf.getProperty(name);
+
+ if (result != null) {
+ return result;
+ }
+ }
+
+ return defaultValue;
+ }
+
+ private Integer extractMegabytes(Properties conf, String[] names) {
+ String javaOptions = extract(conf, names, null);
+
+ if (javaOptions == null) {
+ return null;
+ }
+
+ Matcher matcher = heapPattern.matcher(javaOptions);
+
+ Integer heapMegabytes = null;
+
+ while (matcher.find()) {
+ String heapSize = matcher.group(1);
+ heapMegabytes =
+ ((int) (StringUtils.TraditionalBinaryPrefix.string2long(heapSize) / BYTES_IN_MEG));
+ }
+
+ return heapMegabytes;
+ }
+
+ private void maybeSetHeapMegabytes(Integer megabytes) {
+ if (megabytes != null) {
+ result.setHeapMegabytes(megabytes);
+ }
+ }
+
+ private void maybeSetJobMapMB(Integer megabytes) {
+ if (megabytes != null) {
+ result.setJobMapMB(megabytes);
+ }
+ }
+
+ private void maybeSetJobReduceMB(Integer megabytes) {
+ if (megabytes != null) {
+ result.setJobReduceMB(megabytes);
+ }
+ }
+
+ /**
+ * Process a collection of JobConf {@link Properties}. We do not restrict it
+ * to be called once. It is okay to process a conf before, during or after the
+ * events.
+ *
+ * @param conf
+ * The job conf properties to be added.
+ */
+ public void process(Properties conf) {
+ if (finalized) {
+ throw new IllegalStateException(
+ "JobBuilder.process(Properties conf) called after LoggedJob built");
+ }
+
+ result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
+ .getCandidates(), "default"));
+ result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
+ .getCandidates(), null));
+
+ maybeSetHeapMegabytes(extractMegabytes(conf,
+ JobConfPropertyNames.TASK_JAVA_OPTS_S.getCandidates()));
+ maybeSetJobMapMB(extractMegabytes(conf,
+ JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates()));
+ maybeSetJobReduceMB(extractMegabytes(conf,
+ JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates()));
+ }
+
+ /**
+ * Request the builder to build the final object. Once called, the
+ * {@link JobBuilder} would accept no more events or job-conf properties.
+ *
+ * @return Parsed {@link LoggedJob} object.
+ */
+ public LoggedJob build() {
+ // The main job here is to build CDFs
+ finalized = true;
+
+ // initialize all the per-job statistics gathering places
+ Histogram[] successfulMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
+ for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
+ successfulMapAttemptTimes[i] = new Histogram();
+ }
+
+ Histogram successfulReduceAttemptTimes = new Histogram();
+ Histogram[] failedMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
+ for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
+ failedMapAttemptTimes[i] = new Histogram();
+ }
+ Histogram failedReduceAttemptTimes = new Histogram();
+
+ Histogram successfulNthMapperAttempts = new Histogram();
+ // Histogram successfulNthReducerAttempts = new Histogram();
+ // Histogram mapperLocality = new Histogram();
+
+ for (LoggedTask task : result.getMapTasks()) {
+ for (LoggedTaskAttempt attempt : task.getAttempts()) {
+ int distance = successfulMapAttemptTimes.length - 1;
+ Long runtime = null;
+
+ if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+ runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+ if (attempt.getResult() == Values.SUCCESS) {
+ LoggedLocation host = attempt.getLocation();
+
+ List<LoggedLocation> locs = task.getPreferredLocations();
+
+ if (host != null && locs != null) {
+ for (LoggedLocation loc : locs) {
+ ParsedHost preferedLoc = new ParsedHost(loc);
+
+ distance =
+ Math.min(distance, preferedLoc
+ .distance(new ParsedHost(host)));
+ }
+
+ // mapperLocality.enter(distance);
+ }
+
+ if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
+ if (runtime != null) {
+ successfulMapAttemptTimes[distance].enter(runtime);
+ }
+ }
+
+ String attemptID = attempt.getAttemptID();
+
+ if (attemptID != null) {
+ Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
+
+ if (matcher.matches()) {
+ String attemptNumberString = matcher.group(1);
+
+ if (attemptNumberString != null) {
+ int attemptNumber = Integer.parseInt(attemptNumberString);
+
+ successfulNthMapperAttempts.enter(attemptNumber);
+ }
+ }
+ }
+ } else {
+ if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+ if (runtime != null) {
+ failedMapAttemptTimes[distance].enter(runtime);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ for (LoggedTask task : result.getReduceTasks()) {
+ for (LoggedTaskAttempt attempt : task.getAttempts()) {
+ Long runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+ if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+ runtime = attempt.getFinishTime() - attempt.getStartTime();
+ }
+ if (attempt.getResult() == Values.SUCCESS) {
+ if (runtime != null) {
+ successfulReduceAttemptTimes.enter(runtime);
+ }
+ } else if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+ failedReduceAttemptTimes.enter(runtime);
+ }
+ }
+ }
+
+ result.setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes));
+
+ LoggedDiscreteCDF failedReduce = new LoggedDiscreteCDF();
+ failedReduce.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100);
+ result.setFailedReduceAttemptCDF(failedReduce);
+
+ result
+ .setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes));
+
+ LoggedDiscreteCDF succReduce = new LoggedDiscreteCDF();
+ succReduce.setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles,
+ 100);
+ result.setSuccessfulReduceAttemptCDF(succReduce);
+
+ result.setFailedMapAttemptCDFs(null);
+
+ long totalSuccessfulAttempts = 0L;
+ long maxTriesToSucceed = 0L;
+
+ for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+ totalSuccessfulAttempts += ent.getValue();
+ maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey());
+ }
+
+ if (totalSuccessfulAttempts > 0L) {
+ double[] successAfterI = new double[(int) maxTriesToSucceed + 1];
+ for (int i = 0; i < successAfterI.length; ++i) {
+ successAfterI[i] = 0.0D;
+ }
+
+ for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+ successAfterI[ent.getKey().intValue()] =
+ ((double) ent.getValue()) / totalSuccessfulAttempts;
+ }
+ result.setMapperTriesToSucceed(successAfterI);
+ } else {
+ result.setMapperTriesToSucceed(null);
+ }
+
+ return result;
+ }
+
+ private ArrayList<LoggedDiscreteCDF> mapCDFArrayList(Histogram[] data) {
+ ArrayList<LoggedDiscreteCDF> result = new ArrayList<LoggedDiscreteCDF>();
+
+ for (Histogram hist : data) {
+ LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
+ discCDF.setCDF(hist, attemptTimesPercentiles, 100);
+ result.add(discCDF);
+ }
+
+ return result;
+ }
+
+ private static Values getPre21Value(String name) {
+ if (name.equalsIgnoreCase("JOB_CLEANUP")) {
+ return Values.CLEANUP;
+ }
+ if (name.equalsIgnoreCase("JOB_SETUP")) {
+ return Values.SETUP;
+ }
+
+ return Values.valueOf(name.toUpperCase());
+ }
+
+ private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
+ LoggedTask task = getTask(event.getTaskId().toString());
+ if (task == null) {
+ return;
+ }
+ task.setFinishTime(event.getFinishTime());
+ }
+
+ private void processTaskStartedEvent(TaskStartedEvent event) {
+ LoggedTask task =
+ getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
+ task.setStartTime(event.getStartTime());
+ task.setPreferredLocations(preferredLocationForSplits(event
+ .getSplitLocations()));
+ }
+
+ private void processTaskFinishedEvent(TaskFinishedEvent event) {
+ LoggedTask task =
+ getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+ if (task == null) {
+ return;
+ }
+ task.setFinishTime(event.getFinishTime());
+ task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+ task.incorporateCounters(event.getCounters());
+ }
+
+ private void processTaskFailedEvent(TaskFailedEvent event) {
+ LoggedTask task =
+ getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+ if (task == null) {
+ return;
+ }
+ task.setFinishTime(event.getFinishTime());
+ task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+ }
+
+ private void processTaskAttemptUnsuccessfulCompletionEvent(
+ TaskAttemptUnsuccessfulCompletionEvent event) {
+ LoggedTaskAttempt attempt =
+ getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+ event.getTaskAttemptId().toString());
+
+ if (attempt == null) {
+ return;
+ }
+
+ attempt.setResult(getPre21Value(event.getTaskStatus()));
+ ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname());
+
+ if (parsedHost != null) {
+ attempt.setLocation(parsedHost.makeLoggedLocation());
+ }
+
+ attempt.setFinishTime(event.getFinishTime());
+ }
+
+ private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+ LoggedTaskAttempt attempt =
+ getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+ event.getTaskAttemptId().toString());
+ if (attempt == null) {
+ return;
+ }
+ attempt.setStartTime(event.getStartTime());
+ }
+
+ private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+ LoggedTaskAttempt attempt =
+ getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+ event.getAttemptId().toString());
+ if (attempt == null) {
+ return;
+ }
+ attempt.setResult(getPre21Value(event.getTaskStatus()));
+ attempt.setLocation(getAndRecordParsedHost(event.getHostname())
+ .makeLoggedLocation());
+ attempt.setFinishTime(event.getFinishTime());
+ attempt
+ .incorporateCounters(event.getCounters());
+ }
+
+ private void processReduceAttemptFinishedEvent(
+ ReduceAttemptFinishedEvent event) {
+ LoggedTaskAttempt attempt =
+ getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+ event.getAttemptId().toString());
+ if (attempt == null) {
+ return;
+ }
+ attempt.setResult(getPre21Value(event.getTaskStatus()));
+ attempt.setHostName(event.getHostname());
+ // XXX There may be redundant location info available in the event.
+ // We might consider extracting it from this event. Currently this
+ // is redundant, but making this will add future-proofing.
+ attempt.setFinishTime(event.getFinishTime());
+ attempt.setShuffleFinished(event.getShuffleFinishTime());
+ attempt.setSortFinished(event.getSortFinishTime());
+ attempt
+ .incorporateCounters(event.getCounters());
+ }
+
+ private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
+ LoggedTaskAttempt attempt =
+ getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+ event.getAttemptId().toString());
+ if (attempt == null) {
+ return;
+ }
+ attempt.setResult(getPre21Value(event.getTaskStatus()));
+ attempt.setHostName(event.getHostname());
+ // XXX There may be redundant location info available in the event.
+ // We might consider extracting it from this event. Currently this
+ // is redundant, but making this will add future-proofing.
+ attempt.setFinishTime(event.getFinishTime());
+ attempt
+ .incorporateCounters(event.getCounters());
+ }
+
+ private void processJobUnsuccessfulCompletionEvent(
+ JobUnsuccessfulCompletionEvent event) {
+ result.setOutcome(Pre21JobHistoryConstants.Values
+ .valueOf(event.getStatus()));
+ result.setFinishTime(event.getFinishTime());
+ }
+
+ private void processJobSubmittedEvent(JobSubmittedEvent event) {
+ result.setJobID(event.getJobId().toString());
+ result.setJobName(event.getJobName());
+ result.setUser(event.getUserName());
+ result.setSubmitTime(event.getSubmitTime());
+ }
+
+ private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
+ result.setOutcome(Pre21JobHistoryConstants.Values
+ .valueOf(event.getStatus()));
+ }
+
+ private void processJobPriorityChangeEvent(JobPriorityChangeEvent event) {
+ result.setPriority(LoggedJob.JobPriority.valueOf(event.getPriority()
+ .toString()));
+ }
+
+ private void processJobInitedEvent(JobInitedEvent event) {
+ result.setLaunchTime(event.getLaunchTime());
+ result.setTotalMaps(event.getTotalMaps());
+ result.setTotalReduces(event.getTotalReduces());
+ }
+
+ private void processJobInfoChangeEvent(JobInfoChangeEvent event) {
+ result.setLaunchTime(event.getLaunchTime());
+ }
+
+ private void processJobFinishedEvent(JobFinishedEvent event) {
+ result.setFinishTime(event.getFinishTime());
+ result.setJobID(jobID);
+ result.setOutcome(Values.SUCCESS);
+ }
+
+ private LoggedTask getTask(String taskIDname) {
+ LoggedTask result = mapTasks.get(taskIDname);
+
+ if (result != null) {
+ return result;
+ }
+
+ result = reduceTasks.get(taskIDname);
+
+ if (result != null) {
+ return result;
+ }
+
+ return otherTasks.get(taskIDname);
+ }
+
+ /**
+ * @param type
+ * the task type
+ * @param taskIDname
+ * the task ID name, as a string
+ * @param allowCreate
+ * if true, we can create a task.
+ * @return
+ */
+ private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
+ boolean allowCreate) {
+ Map<String, LoggedTask> taskMap = otherTasks;
+ List<LoggedTask> tasks = this.result.getOtherTasks();
+
+ switch (type) {
+ case MAP:
+ taskMap = mapTasks;
+ tasks = this.result.getMapTasks();
+ break;
+
+ case REDUCE:
+ taskMap = reduceTasks;
+ tasks = this.result.getReduceTasks();
+ break;
+
+ default:
+ // no code
+ }
+
+ LoggedTask result = taskMap.get(taskIDname);
+
+ if (result == null && allowCreate) {
+ result = new LoggedTask();
+ result.setTaskType(getPre21Value(type.toString()));
+ result.setTaskID(taskIDname);
+ taskMap.put(taskIDname, result);
+ tasks.add(result);
+ }
+
+ return result;
+ }
+
+ private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
+ String taskIDName, String taskAttemptName) {
+ LoggedTask task = getOrMakeTask(type, taskIDName, false);
+ LoggedTaskAttempt result = attempts.get(taskAttemptName);
+
+ if (result == null && task != null) {
+ result = new LoggedTaskAttempt();
+ result.setAttemptID(taskAttemptName);
+ attempts.put(taskAttemptName, result);
+ task.getAttempts().add(result);
+ }
+
+ return result;
+ }
+
+ private ParsedHost getAndRecordParsedHost(String hostName) {
+ ParsedHost result = ParsedHost.parse(hostName);
+
+ if (result != null) {
+ ParsedHost canonicalResult = allHosts.get(result);
+
+ if (canonicalResult != null) {
+ return canonicalResult;
+ }
+
+ allHosts.put(result, result);
+
+ return result;
+ }
+
+ return null;
+ }
+
+ private ArrayList<LoggedLocation> preferredLocationForSplits(String splits) {
+ if (splits != null) {
+ ArrayList<LoggedLocation> locations = null;
+
+ StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+ if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) {
+ locations = new ArrayList<LoggedLocation>();
+
+ while (tok.hasMoreTokens()) {
+ String nextSplit = tok.nextToken();
+
+ ParsedHost node = getAndRecordParsedHost(nextSplit);
+
+ if (locations != null && node != null) {
+ locations.add(node.makeLoggedLocation());
+ }
+ }
+
+ return locations;
+ }
+ }
+
+ return null;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.JobConf;
+
+public enum JobConfPropertyNames {
+ QUEUE_NAMES("mapred.job.queue.name"),
+ JOB_NAMES("mapred.job.name"),
+ TASK_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS),
+ MAP_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS,
+ JobConf.MAPRED_MAP_TASK_JAVA_OPTS),
+ REDUCE_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS,
+ JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS);
+
+ private String[] candidates;
+
+ JobConfPropertyNames(String... candidates) {
+ this.candidates = candidates;
+ }
+
+ public String[] getCandidates() {
+ return candidates;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * {@link JobConfigurationParser} parses the job configuration xml file, and
+ * extracts various framework specific properties. It parses the file using a
+ * stream-parser and thus is more memory efficient. [This optimization may be
+ * postponed for a future release]
+ */
+public class JobConfigurationParser {
+ final private Set<String> interested;
+
+ /**
+ * Constructor
+ *
+ * @param interested
+ * properties we should extract from the job configuration xml.
+ */
+ public JobConfigurationParser(List<String> interested) {
+ this.interested = new HashSet<String>(interested);
+ }
+
+ /**
+ * Parse the job configuration file (as an input stream) and return a
+ * {@link Properties} collection. The input stream will not be closed after
+ * return from the call.
+ *
+ * @param input
+ * The input data.
+ * @return A {@link Properties} collection extracted from the job
+ * configuration xml.
+ * @throws IOException
+ */
+ Properties parse(InputStream input) throws IOException {
+ Properties result = new Properties();
+
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+ DocumentBuilder db = dbf.newDocumentBuilder();
+
+ Document doc = db.parse(input);
+
+ Element root = doc.getDocumentElement();
+
+ if (!"configuration".equals(root.getTagName())) {
+ System.out.print("root is not a configuration node");
+ return null;
+ }
+
+ NodeList props = root.getChildNodes();
+
+ for (int i = 0; i < props.getLength(); ++i) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element))
+ continue;
+ Element prop = (Element) propNode;
+ if (!"property".equals(prop.getTagName())) {
+ System.out.print("bad conf file: element not <property>");
+ }
+ NodeList fields = prop.getChildNodes();
+ String attr = null;
+ String value = null;
+ @SuppressWarnings("unused")
+ boolean finalParameter = false;
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element)) {
+ continue;
+ }
+
+ Element field = (Element) fieldNode;
+ if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+ attr = ((Text) field.getFirstChild()).getData().trim();
+ }
+ if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+ value = ((Text) field.getFirstChild()).getData();
+ }
+ if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+ finalParameter =
+ "true".equals(((Text) field.getFirstChild()).getData());
+ }
+ }
+
+ if (interested.contains(attr) && value != null) {
+ result.put(attr, value);
+ }
+ }
+ } catch (ParserConfigurationException e) {
+ return null;
+ } catch (SAXException e) {
+ return null;
+ }
+
+ return result;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record successful completion of job
+ *
+ */
+public class JobFinishedEvent implements HistoryEvent {
+ private JobID jobId;
+ private long finishTime;
+ private int finishedMaps;
+ private int finishedReduces;
+ private int failedMaps;
+ private int failedReduces;
+ private Counters mapCounters;
+ private Counters reduceCounters;
+ private Counters totalCounters;
+
+ /**
+ * Create an event to record successful job completion
+ * @param id Job ID
+ * @param finishTime Finish time of the job
+ * @param finishedMaps The number of finished maps
+ * @param finishedReduces The number of finished reduces
+ * @param failedMaps The number of failed maps
+ * @param failedReduces The number of failed reduces
+ * @param mapCounters Map Counters for the job
+ * @param reduceCounters Reduce Counters for the job
+ * @param totalCounters Total Counters for the job
+ */
+ public JobFinishedEvent(JobID id, long finishTime,
+ int finishedMaps, int finishedReduces,
+ int failedMaps, int failedReduces,
+ Counters mapCounters, Counters reduceCounters,
+ Counters totalCounters) {
+ this.jobId = id;
+ this.finishTime = finishTime;
+ this.finishedMaps = finishedMaps;
+ this.finishedReduces = finishedReduces;
+ this.failedMaps = failedMaps;
+ this.failedReduces = failedReduces;
+ this.mapCounters = mapCounters;
+ this.reduceCounters = reduceCounters;
+ this.totalCounters = totalCounters;
+ }
+
+ public EventType getEventType() {
+ return EventType.JOB_FINISHED;
+ }
+
+ /** Get the Job ID */
+ public JobID getJobid() { return jobId; }
+ /** Get the job finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the number of finished maps for the job */
+ public int getFinishedMaps() { return finishedMaps; }
+ /** Get the number of finished reducers for the job */
+ public int getFinishedReduces() { return finishedReduces; }
+ /** Get the number of failed maps for the job */
+ public int getFailedMaps() { return failedMaps; }
+ /** Get the number of failed reducers for the job */
+ public int getFailedReduces() { return failedReduces; }
+ /** Get the counters for the job */
+ public Counters getTotalCounters() {
+ return totalCounters;
+ }
+ /** Get the Map counters for the job */
+ public Counters getMapCounters() {
+ return mapCounters;
+ }
+ /** Get the reduce counters for the job */
+ public Counters getReduceCounters() {
+ return reduceCounters;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * {@link JobHistoryParser} defines the interface of a Job History file parser.
+ */
+public interface JobHistoryParser extends Closeable {
+ /**
+ * Get the next {@link HistoryEvent}
+ * @return the next {@link HistoryEvent}. If no more events left, return null.
+ * @throws IOException
+ */
+ HistoryEvent nextEvent() throws IOException;
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link JobHistoryParserFactory} is a singleton class that attempts to
+ * determine the version of job history and return a proper parser.
+ */
+public class JobHistoryParserFactory {
+ public static JobHistoryParser getParser(RewindableInputStream ris)
+ throws IOException {
+ for (VersionDetector vd : VersionDetector.values()) {
+ boolean canParse = vd.canParse(ris);
+ ris.rewind();
+ if (canParse) {
+ return vd.newInstance(ris);
+ }
+ }
+
+ throw new IOException("No suitable parser.");
+ }
+
+ enum VersionDetector {
+ Hadoop20() {
+
+ @Override
+ public boolean canParse(InputStream input) throws IOException {
+ return Hadoop20JHParser.canParse(input);
+ }
+
+ @Override
+ public JobHistoryParser newInstance(InputStream input) throws IOException {
+ return new Hadoop20JHParser(input);
+ }
+ };
+
+ abstract JobHistoryParser newInstance(InputStream input) throws IOException;
+
+ abstract boolean canParse(InputStream input) throws IOException;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record changes in the submit and launch time of
+ * a job
+ */
+public class JobInfoChangeEvent implements HistoryEvent {
+ private JobID jobId;
+ private long submitTime;
+ private long launchTime;
+
+ /**
+ * Create a event to record the submit and launch time of a job
+ * @param id Job Id
+ * @param submitTime Submit time of the job
+ * @param launchTime Launch time of the job
+ */
+ public JobInfoChangeEvent(JobID id, long submitTime, long launchTime) {
+ this.jobId = id;
+ this.submitTime = submitTime;
+ this.launchTime = launchTime;
+ }
+
+ /** Get the Job ID */
+ public JobID getJobId() { return jobId; }
+ /** Get the Job submit time */
+ public long getSubmitTime() { return submitTime; }
+ /** Get the Job launch time */
+ public long getLaunchTime() { return launchTime; }
+
+ public EventType getEventType() {
+ return EventType.JOB_INFO_CHANGED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the initialization of a job
+ *
+ */
+public class JobInitedEvent implements HistoryEvent {
+ private JobID jobId;
+ private long launchTime;
+ private int totalMaps;
+ private int totalReduces;
+ private String jobStatus;
+
+ /**
+ * Create an event to record job initialization
+ * @param id
+ * @param launchTime
+ * @param totalMaps
+ * @param totalReduces
+ * @param jobStatus
+ */
+ public JobInitedEvent(JobID id, long launchTime, int totalMaps,
+ int totalReduces, String jobStatus) {
+ this.jobId = id;
+ this.launchTime = launchTime;
+ this.totalMaps = totalMaps;
+ this.totalReduces = totalReduces;
+ this.jobStatus = jobStatus;
+ }
+
+ /** Get the job ID */
+ public JobID getJobId() { return jobId; }
+ /** Get the launch time */
+ public long getLaunchTime() { return launchTime; }
+ /** Get the total number of maps */
+ public int getTotalMaps() { return totalMaps; }
+ /** Get the total number of reduces */
+ public int getTotalReduces() { return totalReduces; }
+ /** Get the status */
+ public String getStatus() { return jobStatus; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.JOB_INITED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the change of priority of a job
+ *
+ */
+public class JobPriorityChangeEvent implements HistoryEvent {
+ private JobID jobId;
+ private JobPriority priority;
+
+ /** Generate an event to record changes in Job priority
+ * @param id Job Id
+ * @param priority The new priority of the job
+ */
+ public JobPriorityChangeEvent(JobID id, JobPriority priority) {
+ this.jobId = id;
+ this.priority = priority;
+ }
+
+ /** Get the Job ID */
+ public JobID getJobId() { return jobId; }
+ /** Get the job priority */
+ public JobPriority getPriority() {
+ return priority;
+ }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.JOB_PRIORITY_CHANGED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the change of status for a job
+ *
+ */
+public class JobStatusChangedEvent implements HistoryEvent {
+ private JobID jobId;
+ private String jobStatus;
+
+ /**
+ * Create an event to record the change in the Job Status
+ * @param id Job ID
+ * @param jobStatus The new job status
+ */
+ public JobStatusChangedEvent(JobID id, String jobStatus) {
+ this.jobId = id;
+ this.jobStatus = jobStatus;
+ }
+
+ /** Get the Job Id */
+ public JobID getJobId() { return jobId; }
+ /** Get the event status */
+ public String getStatus() { return jobStatus; }
+ /** Get the event type */
+ public EventType getEventType() {
+ return EventType.JOB_STATUS_CHANGED;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Event to record the submission of a job
+ *
+ */
+public class JobSubmittedEvent implements HistoryEvent {
+ private JobID jobId;
+ private String jobName;
+ private String userName;
+ private long submitTime;
+ private String jobConfPath;
+ private Map<JobACL, AccessControlList> jobAcls;
+
+ /**
+ * @deprecated Use
+ * {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
+ * instead.
+ */
+ @Deprecated
+ public JobSubmittedEvent(JobID id, String jobName, String userName,
+ long submitTime, String jobConfPath) {
+ this(id, jobName, userName, submitTime, jobConfPath,
+ new HashMap<JobACL, AccessControlList>());
+ }
+
+ /**
+ * Create an event to record job submission
+ * @param id The job Id of the job
+ * @param jobName Name of the job
+ * @param userName Name of the user who submitted the job
+ * @param submitTime Time of submission
+ * @param jobConfPath Path of the Job Configuration file
+ * @param jobACLs The configured acls for the job.
+ */
+ public JobSubmittedEvent(JobID id, String jobName, String userName,
+ long submitTime, String jobConfPath,
+ Map<JobACL, AccessControlList> jobACLs) {
+ this.jobId = id;
+ this.jobName = jobName;
+ this.userName = userName;
+ this.submitTime = submitTime;
+ this.jobConfPath = jobConfPath;
+ this.jobAcls = jobACLs;
+ }
+
+ /** Get the Job Id */
+ public JobID getJobId() { return jobId; }
+ /** Get the Job name */
+ public String getJobName() { return jobName; }
+ /** Get the user name */
+ public String getUserName() { return userName; }
+ /** Get the submit time */
+ public long getSubmitTime() { return submitTime; }
+ /** Get the Path for the Job Configuration file */
+ public String getJobConfPath() { return jobConfPath; }
+ /** Get the acls configured for the job **/
+ public Map<JobACL, AccessControlList> getJobAcls() {
+ return jobAcls;
+ }
+
+ /** Get the event type */
+ public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record Failed and Killed completion of jobs
+ *
+ */
+public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
+ private JobID jobId;
+ private long finishTime;
+ private int finishedMaps;
+ private int finishedReduces;
+ private String jobStatus;
+
+ /**
+ * Create an event to record unsuccessful completion (killed/failed) of jobs
+ * @param id Job ID
+ * @param finishTime Finish time of the job
+ * @param finishedMaps Number of finished maps
+ * @param finishedReduces Number of finished reduces
+ * @param status Status of the job
+ */
+ public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
+ int finishedMaps,
+ int finishedReduces, String status) {
+ this.jobId = id;
+ this.finishTime = finishTime;
+ this.finishedMaps = finishedMaps;
+ this.finishedReduces = finishedReduces;
+ this.jobStatus = status;
+ }
+
+ /** Get the Job ID */
+ public JobID getJobId() { return jobId; }
+ /** Get the job finish time */
+ public long getFinishTime() { return finishTime; }
+ /** Get the number of finished maps */
+ public int getFinishedMaps() { return finishedMaps; }
+ /** Get the number of finished reduces */
+ public int getFinishedReduces() { return finishedReduces; }
+ /** Get the status */
+ public String getStatus() { return jobStatus; }
+ /** Get the event type */
+ public EventType getEventType() {
+ if ("FAILED".equals(getStatus())) {
+ return EventType.JOB_FAILED;
+ } else
+ return EventType.JOB_KILLED;
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Fri Mar 4 04:22:59 2011
@@ -23,31 +23,26 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
/**
* A simple wrapper for parsing JSON-encoded data using ObjectMapper.
- * @param <T> The (base) type of the object(s) to be parsed by this parser.
+ *
+ * @param <T>
+ * The (base) type of the object(s) to be parsed by this parser.
*/
class JsonObjectMapperParser<T> implements Closeable {
private final ObjectMapper mapper;
private final Class<? extends T> clazz;
private final JsonParser jsonParser;
- private final Decompressor decompressor;
/**
* Constructor.
*
- * @param path
+ * @param path
* Path to the JSON data file, possibly compressed.
* @param conf
* @throws IOException
@@ -58,17 +53,7 @@ class JsonObjectMapperParser<T> implemen
mapper.configure(
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
this.clazz = clazz;
- FileSystem fs = path.getFileSystem(conf);
- CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
- InputStream input;
- if (codec == null) {
- input = fs.open(path);
- decompressor = null;
- } else {
- FSDataInputStream fsdis = fs.open(path);
- decompressor = CodecPool.getDecompressor(codec);
- input = codec.createInputStream(fsdis, decompressor);
- }
+ InputStream input = new PossiblyDecompressedInputStream(path, conf);
jsonParser = mapper.getJsonFactory().createJsonParser(input);
}
@@ -84,7 +69,6 @@ class JsonObjectMapperParser<T> implemen
mapper.configure(
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
this.clazz = clazz;
- decompressor = null;
jsonParser = mapper.getJsonFactory().createJsonParser(input);
}
@@ -105,12 +89,6 @@ class JsonObjectMapperParser<T> implemen
@Override
public void close() throws IOException {
- try {
- jsonParser.close();
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
+ jsonParser.close();
}
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
+ * @param <T> The type of the objects to be written.
+ */
+public class JsonObjectMapperWriter<T> implements Closeable {
+ private JsonGenerator writer;
+
+ public JsonObjectMapperWriter(OutputStream output, boolean prettyPrint) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(
+ SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ mapper.getJsonFactory();
+ writer = mapper.getJsonFactory().createJsonGenerator(
+ output, JsonEncoding.UTF8);
+ if (prettyPrint) {
+ writer.useDefaultPrettyPrinter();
+ }
+ }
+
+ public void write(T object) throws IOException {
+ writer.writeObject(object);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Mar 4 04:22:59 2011
@@ -102,6 +102,24 @@ public class LoggedJob implements DeepCo
setJobID(jobID);
}
+ void adjustTimes(long adjustment) {
+ submitTime += adjustment;
+ launchTime += adjustment;
+ finishTime += adjustment;
+
+ for (LoggedTask task : mapTasks) {
+ task.adjustTimes(adjustment);
+ }
+
+ for (LoggedTask task : reduceTasks) {
+ task.adjustTimes(adjustment);
+ }
+
+ for (LoggedTask task : otherTasks) {
+ task.adjustTimes(adjustment);
+ }
+ }
+
@SuppressWarnings("unused")
// for input parameter ignored.
@JsonAnySetter
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Fri Mar 4 04:22:59 2011
@@ -82,7 +82,7 @@ public class LoggedNetworkTopology imple
* @param level
* the level number
*/
- LoggedNetworkTopology(HashSet<ParsedHost> hosts, String name, int level) {
+ LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
this.name = name;
this.children = null;
@@ -119,6 +119,10 @@ public class LoggedNetworkTopology imple
}
}
+ LoggedNetworkTopology(Set<ParsedHost> hosts) {
+ this(hosts, "<root>", 0);
+ }
+
public String getName() {
return name;
}