You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/02/18 19:43:30 UTC

svn commit: r911519 [3/3] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+
+/**
+ * Building the cluster topology.
+ */
+public class TopologyBuilder {
+  private Set<ParsedHost> allHosts = new HashSet<ParsedHost>();
+
+  /**
+   * Process one {@link HistoryEvent}
+   * 
+   * @param event
+   *          The {@link HistoryEvent} to be processed.
+   */
+  public void process(HistoryEvent event) {
+    if (event instanceof TaskAttemptFinishedEvent) {
+      processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+      processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof TaskStartedEvent) {
+      processTaskStartedEvent((TaskStartedEvent) event);
+    }
+
+    // I do NOT expect these if statements to be exhaustive.
+  }
+
+  /**
+   * Process a collection of JobConf {@link Properties}. We do not restrict it
+   * to be called once.
+   * 
+   * @param conf
+   *          The job conf properties to be added.
+   */
+  public void process(Properties conf) {
+    // no code
+  }
+
+  /**
+   * Request the builder to build the final object. Once called, the
+   * {@link TopologyBuilder} would accept no more events or job-conf properties.
+   * 
+   * @return Parsed {@link LoggedNetworkTopology} object.
+   */
+  public LoggedNetworkTopology build() {
+    return new LoggedNetworkTopology(allHosts);
+  }
+
+  private void processTaskStartedEvent(TaskStartedEvent event) {
+    preferredLocationForSplits(event.getSplitLocations());
+  }
+
+  private void processTaskAttemptUnsuccessfulCompletionEvent(
+      TaskAttemptUnsuccessfulCompletionEvent event) {
+    recordParsedHost(event.getHostname());
+  }
+
+  private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    recordParsedHost(event.getHostname());
+  }
+
+  private void recordParsedHost(String hostName) {
+    ParsedHost result = ParsedHost.parse(hostName);
+
+    if (result != null && !allHosts.contains(result)) {
+      allHosts.add(result);
+    }
+  }
+
+  private void preferredLocationForSplits(String splits) {
+    if (splits != null) {
+      StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+      while (tok.hasMoreTokens()) {
+        String nextSplit = tok.nextToken();
+
+        recordParsedHost(nextSplit);
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * The main driver of the Rumen Parser.
+ */
+public class TraceBuilder extends Configured implements Tool {
+  static final private Log LOG = LogFactory.getLog(TraceBuilder.class);
+
+  static final int RUN_METHOD_FAILED_EXIT_CODE = 3;
+
+  TopologyBuilder topologyBuilder = new TopologyBuilder();
+  JobConfigurationParser jobConfParser;
+  Outputter<LoggedJob> traceWriter;
+  Outputter<LoggedNetworkTopology> topologyWriter;
+
+  // Needs to be interpreted greedily or otherwise constrained
+  static final String jobIDRegex = "job_[0-9]+_[0-9]+";
+
+  // returns jobID in Capturing Group 1
+  static final Pattern confFileNameRegex =
+      Pattern.compile("[^.].+_(" + jobIDRegex
+          + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
+
+  // This can match text that confFileNameRegex will also match. The code
+  // gives precedence to confFileNameRegex . Returns jobID
+  // in Capturing Group 1
+  static final Pattern jobFileNameRegex =
+      Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
+
+  static class MyOptions {
+    Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
+
+    @SuppressWarnings("unchecked")
+    Class<? extends Outputter> clazzTraceOutputter = DefaultOutputter.class;
+    Path traceOutput;
+    Path topologyOutput;
+
+    List<Path> inputs = new LinkedList<Path>();
+
+    MyOptions(String[] args, Configuration conf) throws FileNotFoundException,
+        IOException, ClassNotFoundException {
+      int switchTop = 0;
+
+      while (args[switchTop].startsWith("-")) {
+        if (args[switchTop].equalsIgnoreCase("-demuxer")) {
+          inputDemuxerClass =
+              Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
+
+          ++switchTop;
+        }
+      }
+
+      traceOutput = new Path(args[0 + switchTop]);
+      topologyOutput = new Path(args[1 + switchTop]);
+
+      for (int i = 2 + switchTop; i < args.length; ++i) {
+
+        Path thisPath = new Path(args[i]);
+
+        FileSystem fs = thisPath.getFileSystem(conf);
+        if (fs.getFileStatus(thisPath).isDir()) {
+          FileStatus[] statuses = fs.listStatus(thisPath);
+
+          List<String> dirNames = new ArrayList<String>();
+
+          for (int j = 0; j < statuses.length; ++j) {
+            String name = statuses[j].getPath().getName();
+
+            if (!(name.length() >= 4 && ".crc".equals(name.substring(name
+                .length() - 4)))) {
+              dirNames.add(name);
+            }
+          }
+
+          String[] sortableNames = dirNames.toArray(new String[1]);
+
+          Arrays.sort(sortableNames);
+
+          for (String dirName : sortableNames) {
+            inputs.add(new Path(thisPath, dirName));
+          }
+        } else {
+          inputs.add(thisPath);
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TraceBuilder builder = new TraceBuilder();
+    int result = RUN_METHOD_FAILED_EXIT_CODE;
+
+    try {
+      result = ToolRunner.run(builder, args);
+    } finally {
+      try {
+        builder.finish();
+      } finally {
+        if (result == 0) {
+          return;
+        }
+
+        System.exit(result);
+      }
+    }
+  }
+
+  private static String applyParser(String fileName, Pattern pattern) {
+    Matcher matcher = pattern.matcher(fileName);
+
+    if (!matcher.matches()) {
+      return null;
+    }
+
+    return matcher.group(1);
+  }
+
+  /**
+   * @param fileName
+   * @return the jobID String, parsed out of the file name. We return a valid
+   *         String for either a history log file or a config file. Otherwise,
+   *         [especially for .crc files] we return null.
+   */
+  static String extractJobID(String fileName) {
+    return applyParser(fileName, jobFileNameRegex);
+  }
+
+  static boolean isJobConfXml(String fileName, InputStream input) {
+    return applyParser(fileName, confFileNameRegex) != null;
+  }
+
+  private void addInterestedProperties(List<String> interestedProperties,
+      String[] names) {
+    for (String name : names) {
+      interestedProperties.add(name);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int run(String[] args) throws Exception {
+    MyOptions options = new MyOptions(args, getConf());
+    List<String> interestedProperties = new ArrayList<String>();
+    {
+      for (JobConfPropertyNames candidateSet : JobConfPropertyNames.values()) {
+        addInterestedProperties(interestedProperties, candidateSet
+            .getCandidates());
+      }
+    }
+    jobConfParser = new JobConfigurationParser(interestedProperties);
+    traceWriter = options.clazzTraceOutputter.newInstance();
+    traceWriter.init(options.traceOutput, getConf());
+    topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
+    topologyWriter.init(options.topologyOutput, getConf());
+
+    try {
+      JobBuilder jobBuilder = null;
+
+      for (Path p : options.inputs) {
+        InputDemuxer inputDemuxer = options.inputDemuxerClass.newInstance();
+
+        inputDemuxer.bindTo(p, getConf());
+
+        if (inputDemuxer != null) {
+          Pair<String, InputStream> filePair = null;
+
+          try {
+            while ((filePair = inputDemuxer.getNext()) != null) {
+              RewindableInputStream ris =
+                  new RewindableInputStream(filePair.second());
+
+              JobHistoryParser parser = null;
+
+              try {
+                String jobID = extractJobID(filePair.first());
+                if (jobID == null) {
+                  LOG.warn("File skipped: Invalid file name: "
+                      + filePair.first());
+                  continue;
+                }
+                if ((jobBuilder == null)
+                    || (!jobBuilder.getJobID().equals(jobID))) {
+                  if (jobBuilder != null) {
+                    traceWriter.output(jobBuilder.build());
+                  }
+                  jobBuilder = new JobBuilder(jobID);
+                }
+
+                if (isJobConfXml(filePair.first(), ris)) {
+                  processJobConf(jobConfParser.parse(ris.rewind()), jobBuilder);
+                } else {
+                  parser = JobHistoryParserFactory.getParser(ris);
+                  if (parser == null) {
+                    LOG.warn("File skipped: Cannot find suitable parser: "
+                        + filePair.first());
+                  } else {
+                    processJobHistory(parser, jobBuilder);
+                  }
+                }
+              } finally {
+                if (parser == null) {
+                  ris.close();
+                } else {
+                  parser.close();
+                  parser = null;
+                }
+              }
+            }
+          } catch (Throwable t) {
+            if (filePair != null) {
+              LOG.warn("TraceBuilder got an error while processing file "
+                  + filePair.first(), t);
+            }
+          } finally {
+            inputDemuxer.close();
+          }
+        }
+      }
+      if (jobBuilder != null) {
+        traceWriter.output(jobBuilder.build());
+        jobBuilder = null;
+      } else {
+        LOG.warn("No job found in traces: ");
+      }
+
+      topologyWriter.output(topologyBuilder.build());
+    } finally {
+      traceWriter.close();
+      topologyWriter.close();
+    }
+
+    return 0;
+  }
+
+  private void processJobConf(Properties properties, JobBuilder jobBuilder) {
+    jobBuilder.process(properties);
+    topologyBuilder.process(properties);
+  }
+
+  void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
+      throws IOException {
+    HistoryEvent e;
+    while ((e = parser.nextEvent()) != null) {
+      jobBuilder.process(e);
+      topologyBuilder.process(e);
+    }
+
+    parser.close();
+  }
+
+  void finish() {
+    IOUtils.cleanup(LOG, traceWriter, topologyWriter);
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+// This class exists to hold a bunch of static utils.  It's never instantiated.
+abstract class Version20LogInterfaceUtils {
+
+  static TaskType get20TaskType(String taskType) {
+    try {
+      return TaskType.valueOf(taskType);
+    } catch (IllegalArgumentException e) {
+      if ("CLEANUP".equals(taskType)) {
+        return TaskType.JOB_CLEANUP;
+      }
+
+      if ("SETUP".equals(taskType)) {
+        return TaskType.JOB_SETUP;
+      }
+
+      return null;
+    }
+  }
+
+}