You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/02/18 19:43:30 UTC
svn commit: r911519 [3/3] - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/jobhistory/
src/test/mapred/org/apache/hadoop/tools/rumen/
src/test/tools/data/rumen/small-trace-test/
src/tools/org/apache/hadoop/tools/rumen/
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+
+/**
+ * Building the cluster topology.
+ */
+public class TopologyBuilder {
+ private Set<ParsedHost> allHosts = new HashSet<ParsedHost>();
+
+ /**
+ * Process one {@link HistoryEvent}
+ *
+ * @param event
+ * The {@link HistoryEvent} to be processed.
+ */
+ public void process(HistoryEvent event) {
+ if (event instanceof TaskAttemptFinishedEvent) {
+ processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+ } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+ processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+ } else if (event instanceof TaskStartedEvent) {
+ processTaskStartedEvent((TaskStartedEvent) event);
+ }
+
+ // I do NOT expect these if statements to be exhaustive.
+ }
+
+ /**
+ * Process a collection of JobConf {@link Properties}. We do not restrict it
+ * to be called once.
+ *
+ * @param conf
+ * The job conf properties to be added.
+ */
+ public void process(Properties conf) {
+ // no code
+ }
+
+ /**
+ * Request the builder to build the final object. Once called, the
+ * {@link TopologyBuilder} would accept no more events or job-conf properties.
+ *
+ * @return Parsed {@link LoggedNetworkTopology} object.
+ */
+ public LoggedNetworkTopology build() {
+ return new LoggedNetworkTopology(allHosts);
+ }
+
+ private void processTaskStartedEvent(TaskStartedEvent event) {
+ preferredLocationForSplits(event.getSplitLocations());
+ }
+
+ private void processTaskAttemptUnsuccessfulCompletionEvent(
+ TaskAttemptUnsuccessfulCompletionEvent event) {
+ recordParsedHost(event.getHostname());
+ }
+
+ private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+ recordParsedHost(event.getHostname());
+ }
+
+ private void recordParsedHost(String hostName) {
+ ParsedHost result = ParsedHost.parse(hostName);
+
+ if (result != null && !allHosts.contains(result)) {
+ allHosts.add(result);
+ }
+ }
+
+ private void preferredLocationForSplits(String splits) {
+ if (splits != null) {
+ StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+ while (tok.hasMoreTokens()) {
+ String nextSplit = tok.nextToken();
+
+ recordParsedHost(nextSplit);
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * The main driver of the Rumen Parser.
+ */
+public class TraceBuilder extends Configured implements Tool {
+ static final private Log LOG = LogFactory.getLog(TraceBuilder.class);
+
+ static final int RUN_METHOD_FAILED_EXIT_CODE = 3;
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ JobConfigurationParser jobConfParser;
+ Outputter<LoggedJob> traceWriter;
+ Outputter<LoggedNetworkTopology> topologyWriter;
+
+ // Needs to be interpreted greedily or otherwise constrained
+ static final String jobIDRegex = "job_[0-9]+_[0-9]+";
+
+ // returns jobID in Capturing Group 1
+ static final Pattern confFileNameRegex =
+ Pattern.compile("[^.].+_(" + jobIDRegex
+ + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
+
+ // This can match text that confFileNameRegex will also match. The code
+ // gives precedence to confFileNameRegex . Returns jobID
+ // in Capturing Group 1
+ static final Pattern jobFileNameRegex =
+ Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
+
+ static class MyOptions {
+ Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
+
+ @SuppressWarnings("unchecked")
+ Class<? extends Outputter> clazzTraceOutputter = DefaultOutputter.class;
+ Path traceOutput;
+ Path topologyOutput;
+
+ List<Path> inputs = new LinkedList<Path>();
+
+ MyOptions(String[] args, Configuration conf) throws FileNotFoundException,
+ IOException, ClassNotFoundException {
+ int switchTop = 0;
+
+ while (args[switchTop].startsWith("-")) {
+ if (args[switchTop].equalsIgnoreCase("-demuxer")) {
+ inputDemuxerClass =
+ Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
+
+ ++switchTop;
+ }
+ }
+
+ traceOutput = new Path(args[0 + switchTop]);
+ topologyOutput = new Path(args[1 + switchTop]);
+
+ for (int i = 2 + switchTop; i < args.length; ++i) {
+
+ Path thisPath = new Path(args[i]);
+
+ FileSystem fs = thisPath.getFileSystem(conf);
+ if (fs.getFileStatus(thisPath).isDir()) {
+ FileStatus[] statuses = fs.listStatus(thisPath);
+
+ List<String> dirNames = new ArrayList<String>();
+
+ for (int j = 0; j < statuses.length; ++j) {
+ String name = statuses[j].getPath().getName();
+
+ if (!(name.length() >= 4 && ".crc".equals(name.substring(name
+ .length() - 4)))) {
+ dirNames.add(name);
+ }
+ }
+
+ String[] sortableNames = dirNames.toArray(new String[1]);
+
+ Arrays.sort(sortableNames);
+
+ for (String dirName : sortableNames) {
+ inputs.add(new Path(thisPath, dirName));
+ }
+ } else {
+ inputs.add(thisPath);
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TraceBuilder builder = new TraceBuilder();
+ int result = RUN_METHOD_FAILED_EXIT_CODE;
+
+ try {
+ result = ToolRunner.run(builder, args);
+ } finally {
+ try {
+ builder.finish();
+ } finally {
+ if (result == 0) {
+ return;
+ }
+
+ System.exit(result);
+ }
+ }
+ }
+
+ private static String applyParser(String fileName, Pattern pattern) {
+ Matcher matcher = pattern.matcher(fileName);
+
+ if (!matcher.matches()) {
+ return null;
+ }
+
+ return matcher.group(1);
+ }
+
+ /**
+ * @param fileName
+ * @return the jobID String, parsed out of the file name. We return a valid
+ * String for either a history log file or a config file. Otherwise,
+ * [especially for .crc files] we return null.
+ */
+ static String extractJobID(String fileName) {
+ return applyParser(fileName, jobFileNameRegex);
+ }
+
+ static boolean isJobConfXml(String fileName, InputStream input) {
+ return applyParser(fileName, confFileNameRegex) != null;
+ }
+
+ private void addInterestedProperties(List<String> interestedProperties,
+ String[] names) {
+ for (String name : names) {
+ interestedProperties.add(name);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int run(String[] args) throws Exception {
+ MyOptions options = new MyOptions(args, getConf());
+ List<String> interestedProperties = new ArrayList<String>();
+ {
+ for (JobConfPropertyNames candidateSet : JobConfPropertyNames.values()) {
+ addInterestedProperties(interestedProperties, candidateSet
+ .getCandidates());
+ }
+ }
+ jobConfParser = new JobConfigurationParser(interestedProperties);
+ traceWriter = options.clazzTraceOutputter.newInstance();
+ traceWriter.init(options.traceOutput, getConf());
+ topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
+ topologyWriter.init(options.topologyOutput, getConf());
+
+ try {
+ JobBuilder jobBuilder = null;
+
+ for (Path p : options.inputs) {
+ InputDemuxer inputDemuxer = options.inputDemuxerClass.newInstance();
+
+ inputDemuxer.bindTo(p, getConf());
+
+ if (inputDemuxer != null) {
+ Pair<String, InputStream> filePair = null;
+
+ try {
+ while ((filePair = inputDemuxer.getNext()) != null) {
+ RewindableInputStream ris =
+ new RewindableInputStream(filePair.second());
+
+ JobHistoryParser parser = null;
+
+ try {
+ String jobID = extractJobID(filePair.first());
+ if (jobID == null) {
+ LOG.warn("File skipped: Invalid file name: "
+ + filePair.first());
+ continue;
+ }
+ if ((jobBuilder == null)
+ || (!jobBuilder.getJobID().equals(jobID))) {
+ if (jobBuilder != null) {
+ traceWriter.output(jobBuilder.build());
+ }
+ jobBuilder = new JobBuilder(jobID);
+ }
+
+ if (isJobConfXml(filePair.first(), ris)) {
+ processJobConf(jobConfParser.parse(ris.rewind()), jobBuilder);
+ } else {
+ parser = JobHistoryParserFactory.getParser(ris);
+ if (parser == null) {
+ LOG.warn("File skipped: Cannot find suitable parser: "
+ + filePair.first());
+ } else {
+ processJobHistory(parser, jobBuilder);
+ }
+ }
+ } finally {
+ if (parser == null) {
+ ris.close();
+ } else {
+ parser.close();
+ parser = null;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ if (filePair != null) {
+ LOG.warn("TraceBuilder got an error while processing file "
+ + filePair.first(), t);
+ }
+ } finally {
+ inputDemuxer.close();
+ }
+ }
+ }
+ if (jobBuilder != null) {
+ traceWriter.output(jobBuilder.build());
+ jobBuilder = null;
+ } else {
+ LOG.warn("No job found in traces: ");
+ }
+
+ topologyWriter.output(topologyBuilder.build());
+ } finally {
+ traceWriter.close();
+ topologyWriter.close();
+ }
+
+ return 0;
+ }
+
+ private void processJobConf(Properties properties, JobBuilder jobBuilder) {
+ jobBuilder.process(properties);
+ topologyBuilder.process(properties);
+ }
+
+ void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
+ throws IOException {
+ HistoryEvent e;
+ while ((e = parser.nextEvent()) != null) {
+ jobBuilder.process(e);
+ topologyBuilder.process(e);
+ }
+
+ parser.close();
+ }
+
+ void finish() {
+ IOUtils.cleanup(LOG, traceWriter, topologyWriter);
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+// This class exists to hold a bunch of static utils. It's never instantiated.
+abstract class Version20LogInterfaceUtils {
+
+ static TaskType get20TaskType(String taskType) {
+ try {
+ return TaskType.valueOf(taskType);
+ } catch (IllegalArgumentException e) {
+ if ("CLEANUP".equals(taskType)) {
+ return TaskType.JOB_CLEANUP;
+ }
+
+ if ("SETUP".equals(taskType)) {
+ return TaskType.JOB_SETUP;
+ }
+
+ return null;
+ }
+ }
+
+}