You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/02/18 19:43:30 UTC

svn commit: r911519 [1/3] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/

Author: cdouglas
Date: Thu Feb 18 18:43:28 2010
New Revision: 911519

URL: http://svn.apache.org/viewvc?rev=911519&view=rev
Log:
MAPREDUCE-1309. Refactor Rumen trace generator to improve code structure
and add extensible support for log formats. Contributed by Dick King

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz   (with props)
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz   (with props)
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz   (with props)
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz   (with props)
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz   (with props)
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Feb 18 18:43:28 2010
@@ -177,6 +177,9 @@
     MAPREDUCE-1445. Refactor Sqoop tests to support better ConnManager testing.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1309. Refactor Rumen trace generator to improve code structure
+    and add extensible support for log formats. (Dick King via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Thu Feb 18 18:43:28 2010
@@ -64,8 +64,9 @@
     this.in = in;
     this.version = in.readLine();
     
-    if (!EventWriter.VERSION.equals(version))
+    if (!EventWriter.VERSION.equals(version)) {
       throw new IOException("Incompatible event log version: "+version);
+    }
     
     this.schema = Schema.parse(in.readLine());
     this.reader = new SpecificDatumReader(schema);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class ConcatenatedInputFilesDemuxer implements InputDemuxer {
+  private String name;
+  private DelimitedInputStream input;
+
+  private String knownNextFileName = null;
+
+  static private int MAXIMUM_HEADER_LINE_LENGTH = 500;
+
+  @Override
+  public void bindTo(Path path, Configuration conf) throws IOException {
+    InputStream underlyingInput = null;
+
+    if (name != null) { // re-binding before the previous one was consumed.
+      close();
+    }
+    name = path.getName();
+
+    underlyingInput = new PossiblyDecompressedInputStream(path, conf);
+
+    input =
+        new DelimitedInputStream(new BufferedInputStream(underlyingInput),
+            "\f!!FILE=", "!!\n");
+
+    knownNextFileName = input.nextFileName();
+
+    if (knownNextFileName == null) {
+      close();
+
+      return;
+    }
+
+    /*
+     * We handle files in specialized formats by trying their demuxers first,
+     * not by failing here.
+     */
+    return;
+  }
+
+  @Override
+  public Pair<String, InputStream> getNext() throws IOException {
+    if (knownNextFileName != null) {
+      Pair<String, InputStream> result =
+          new Pair<String, InputStream>(knownNextFileName, input);
+
+      knownNextFileName = null;
+
+      return result;
+    }
+
+    String nextFileName = input.nextFileName();
+
+    if (nextFileName == null) {
+      return null;
+    }
+
+    return new Pair<String, InputStream>(nextFileName, input);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (input != null) {
+      input.close();
+    }
+  }
+
+  /**
+   * A simple wrapper class to make any input stream delimited. It has an extra
+   * method, getName.
+   * 
+   * The input stream should have lines that look like
+   * <marker><filename><endmarker> . The text <marker> should not occur
+   * elsewhere in the file. The text <endmarker> should not occur in a file
+   * name.
+   */
+  static class DelimitedInputStream extends InputStream {
+    private InputStream input;
+
+    private boolean endSeen = false;
+
+    private final String fileMarker;
+
+    private final byte[] markerBytes;
+
+    private final byte[] fileMarkerBuffer;
+
+    private final String fileEndMarker;
+
+    private final byte[] endMarkerBytes;
+
+    private final byte[] fileEndMarkerBuffer;
+
+    /**
+     * Constructor.
+     * 
+     * @param input
+     */
+    public DelimitedInputStream(InputStream input, String fileMarker,
+        String fileEndMarker) {
+      this.input = new BufferedInputStream(input, 10000);
+      this.input.mark(10000);
+      this.fileMarker = fileMarker;
+      this.markerBytes = this.fileMarker.getBytes();
+      this.fileMarkerBuffer = new byte[this.markerBytes.length];
+      this.fileEndMarker = fileEndMarker;
+      this.endMarkerBytes = this.fileEndMarker.getBytes();
+      this.fileEndMarkerBuffer = new byte[this.endMarkerBytes.length];
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (endSeen) {
+        return -1;
+      }
+
+      input.mark(10000);
+
+      int result = input.read();
+
+      if (result < 0) {
+        endSeen = true;
+        return result;
+      }
+
+      if (result == markerBytes[0]) {
+        input.reset();
+
+        // this might be a marker line
+        int markerReadResult =
+            input.read(fileMarkerBuffer, 0, fileMarkerBuffer.length);
+
+        input.reset();
+
+        if (markerReadResult < fileMarkerBuffer.length
+            || !fileMarker.equals(new String(fileMarkerBuffer))) {
+          return input.read();
+        }
+
+        return -1;
+      }
+
+      return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.io.InputStream#read(byte[], int, int)
+     * 
+     * This does SLIGHTLY THE WRONG THING.
+     * 
+     * If we run off the end of the segment then the input buffer will be
+     * dirtied beyond the point where we claim to have read. If this turns out
+     * to be a problem, save that data somewhere and restore it if needed.
+     */
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      if (endSeen) {
+        return -1;
+      }
+
+      input.mark(length + markerBytes.length + 10);
+
+      int dataSeen = input.read(buffer, offset, length);
+
+      byte[] extraReadBuffer = null;
+      int extraActualRead = -1;
+
+      // search for an instance of a file marker
+      for (int i = offset; i < offset + dataSeen; ++i) {
+        if (buffer[i] == markerBytes[0]) {
+          boolean mismatch = false;
+
+          for (int j = 1; j < Math.min(markerBytes.length, offset + dataSeen
+              - i); ++j) {
+            if (buffer[i + j] != markerBytes[j]) {
+              mismatch = true;
+              break;
+            }
+          }
+
+          if (!mismatch) {
+            // see if we have only a prefix of the markerBytes
+            int uncheckedMarkerCharCount =
+                markerBytes.length - (offset + dataSeen - i);
+
+            if (uncheckedMarkerCharCount > 0) {
+              if (extraReadBuffer == null) {
+                extraReadBuffer = new byte[markerBytes.length - 1];
+
+                extraActualRead = input.read(extraReadBuffer);
+              }
+
+              if (extraActualRead < uncheckedMarkerCharCount) {
+                input.reset();
+                return input.read(buffer, offset, length);
+              }
+
+              for (int j = 0; j < uncheckedMarkerCharCount; ++j) {
+                if (extraReadBuffer[j] != markerBytes[markerBytes.length
+                    - uncheckedMarkerCharCount + j]) {
+                  input.reset();
+                  return input.read(buffer, offset, length);
+                }
+              }
+            }
+
+            input.reset();
+
+            if (i == offset) {
+              return -1;
+            }
+
+            int result = input.read(buffer, offset, i - offset);
+            return result;
+          }
+        }
+      }
+
+      return dataSeen;
+    }
+
+    @Override
+    public int read(byte[] buffer) throws IOException {
+      return read(buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (endSeen) {
+        input.close();
+      }
+    }
+
+    String nextFileName() throws IOException {
+      return nextFileName(MAXIMUM_HEADER_LINE_LENGTH);
+    }
+
+    private String nextFileName(int bufferSize) throws IOException {
+      // the line can't contain a newline and must contain a form feed
+      byte[] buffer = new byte[bufferSize];
+
+      input.mark(bufferSize + 1);
+
+      int actualRead = input.read(buffer);
+      int mostRecentRead = actualRead;
+
+      while (actualRead < bufferSize && mostRecentRead > 0) {
+        mostRecentRead =
+            input.read(buffer, actualRead, bufferSize - actualRead);
+
+        if (mostRecentRead > 0) {
+          actualRead += mostRecentRead;
+        }
+      }
+
+      if (actualRead < markerBytes.length) {
+        input.reset();
+        return null;
+      }
+
+      for (int i = 0; i < markerBytes.length; ++i) {
+        if (markerBytes[i] != buffer[i]) {
+          input.reset();
+          return null;
+        }
+      }
+
+      for (int i = markerBytes.length; i < actualRead; ++i) {
+        if (buffer[i] == endMarkerBytes[0]) {
+          // gather the file name
+          input.reset();
+          // burn the marker
+          if (input.read(buffer, 0, markerBytes.length) < markerBytes.length) {
+            throw new IOException("Can't reread bytes I've read before.");
+          }
+          // get the file name
+          if (input.read(buffer, 0, i - markerBytes.length) < i
+              - markerBytes.length) {
+            throw new IOException("Can't reread bytes I've read before.");
+          }
+          // burn the two exclamation points and the newline
+          if (input.read(fileEndMarkerBuffer) < fileEndMarkerBuffer.length) {
+            input.reset();
+            return null;
+          }
+          for (int j = 0; j < endMarkerBytes.length; ++j) {
+            if (endMarkerBytes[j] != fileEndMarkerBuffer[j]) {
+              input.reset();
+              return null;
+            }
+          }
+
+          return new String(buffer, 0, i - markerBytes.length);
+        }
+
+        if (buffer[i] == '\n') {
+          return null;
+        }
+      }
+
+      // we ran off the end. Was the buffer too short, or is this all there was?
+      input.reset();
+
+      if (actualRead < bufferSize) {
+        return null;
+      }
+
+      return nextFileName(bufferSize * 2);
+    }
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Thu Feb 18 18:43:28 2010
@@ -18,13 +18,33 @@
 
 package org.apache.hadoop.tools.rumen;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import org.junit.Test;
@@ -49,11 +69,11 @@
     final FileSystem lfs = FileSystem.getLocal(conf);
 
     final Path rootInputDir =
-        new Path(System.getProperty("test.tools.input.dir", ""))
-            .makeQualified(lfs);
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
     final Path rootTempDir =
-        new Path(System.getProperty("test.build.data", "/tmp"))
-            .makeQualified(lfs);
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
 
     final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
     final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
@@ -82,25 +102,317 @@
     final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
     final Path traceGoldFile = new Path(rootInputFile, goldTrace);
 
+    @SuppressWarnings("deprecation")
     HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
     int result = ToolRunner.run(analyzer, args);
     assertEquals("Non-zero exit", 0, result);
 
     TestRumenJobTraces
-        .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+        .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyFile,
+            topologyGoldFile, LoggedNetworkTopology.class, "topology");
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, traceFile,
+        traceGoldFile, LoggedJob.class, "trace");
+  }
+
+  @Test
+  public void testRumenViaDispatch() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+    lfs.delete(tempDir, true);
+
+    final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+    final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+    final Path inputPath =
+        new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz");
+
+    System.out.println("topology result file = " + topologyPath);
+    System.out.println("trace result file = " + tracePath);
+
+    String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName();
+
+    String[] args =
+        { "-demuxer", demuxerClassName, tracePath.toString(),
+            topologyPath.toString(), inputPath.toString() };
+
+    final Path topologyGoldFile =
+        new Path(rootInputPath, "dispatch-topology-output.json.gz");
+    final Path traceGoldFile =
+        new Path(rootInputPath, "dispatch-trace-output.json.gz");
+
+    Tool analyzer = new TraceBuilder();
+    int result = ToolRunner.run(analyzer, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenJobTraces
+        .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyPath,
             topologyGoldFile, LoggedNetworkTopology.class, "topology");
-    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
         traceGoldFile, LoggedJob.class, "trace");
   }
 
+  @Test
+  public void testHadoop20JHParser() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    boolean success = false;
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+    lfs.delete(tempDir, true);
+
+    final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
+    final Path goldPath =
+        new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
+
+    InputStream inputLogStream =
+        new PossiblyDecompressedInputStream(inputPath, conf);
+
+    InputStream inputGoldStream =
+        new PossiblyDecompressedInputStream(goldPath, conf);
+
+    BufferedInputStream bis = new BufferedInputStream(inputLogStream);
+    bis.mark(10000);
+    Hadoop20JHParser parser = new Hadoop20JHParser(bis);
+
+    final Path resultPath = new Path(tempDir, "result.text");
+
+    System.out.println("testHadoop20JHParser sent its output to " + resultPath);
+
+    Compressor compressor;
+
+    FileSystem fs = resultPath.getFileSystem(conf);
+    CompressionCodec codec =
+        new CompressionCodecFactory(conf).getCodec(resultPath);
+    OutputStream output;
+    if (codec != null) {
+      compressor = CodecPool.getCompressor(codec);
+      output = codec.createOutputStream(fs.create(resultPath), compressor);
+    } else {
+      output = fs.create(resultPath);
+    }
+
+    PrintStream printStream = new PrintStream(output);
+
+    try {
+      assertEquals("Hadoop20JHParser can't parse the test file", true,
+          Hadoop20JHParser.canParse(inputLogStream));
+
+      bis.reset();
+
+      HistoryEvent event = parser.nextEvent();
+
+      while (event != null) {
+        printStream.println(event.getClass().getCanonicalName());
+        event = parser.nextEvent();
+      }
+
+      printStream.close();
+
+      LineReader goldLines = new LineReader(inputGoldStream);
+      LineReader resultLines =
+          new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
+
+      int lineNumber = 1;
+
+      try {
+        Text goldLine = new Text();
+        Text resultLine = new Text();
+
+        int goldRead = goldLines.readLine(goldLine);
+        int resultRead = resultLines.readLine(resultLine);
+
+        while (goldRead * resultRead != 0) {
+          if (!goldLine.equals(resultLine)) {
+            assertEquals("Type mismatch detected", goldLine, resultLine);
+            break;
+          }
+
+          goldRead = goldLines.readLine(goldLine);
+          resultRead = resultLines.readLine(resultLine);
+
+          ++lineNumber;
+        }
+
+        if (goldRead != resultRead) {
+          assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
+              + " file contains more text at line " + lineNumber, goldRead,
+              resultRead);
+        }
+
+        success = true;
+      } finally {
+        goldLines.close();
+        resultLines.close();
+
+        if (success) {
+          lfs.delete(resultPath, false);
+        }
+      }
+
+    } finally {
+      if (parser == null) {
+        inputLogStream.close();
+      } else {
+        if (parser != null) {
+          parser.close();
+        }
+      }
+
+      if (inputGoldStream != null) {
+        inputGoldStream.close();
+      }
+
+      // it's okay to do this twice [if we get an error on input]
+      printStream.close();
+    }
+  }
+
+  @Test
+  public void testJobConfigurationParser() throws Exception {
+    String[] list1 =
+        { "mapred.job.queue.name", "mapreduce.job.name",
+            "mapred.child.java.opts" };
+
+    String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
+
+    List<String> interested1 = new ArrayList<String>();
+    for (String interested : list1) {
+      interested1.add(interested);
+    }
+
+    List<String> interested2 = new ArrayList<String>();
+    for (String interested : list2) {
+      interested2.add(interested);
+    }
+
+    JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
+    JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
+
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    @SuppressWarnings("deprecation")
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+
+    final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
+
+    InputStream inputConfStream =
+        new PossiblyDecompressedInputStream(inputPath, conf);
+
+    try {
+      Properties props1 = jcp1.parse(inputConfStream);
+      inputConfStream.close();
+
+      inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
+      Properties props2 = jcp2.parse(inputConfStream);
+
+      assertEquals("testJobConfigurationParser: wrong number of properties", 3,
+          props1.size());
+      assertEquals("testJobConfigurationParser: wrong number of properties", 2,
+          props2.size());
+
+      assertEquals("prop test 1", "TheQueue", props1
+          .get("mapred.job.queue.name"));
+      assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
+      assertEquals("prop test 3",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
+              .get("mapred.child.java.opts"));
+      assertEquals("prop test 4", "TheQueue", props2
+          .get("mapred.job.queue.name"));
+      assertEquals("prop test 5",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
+              .get("mapred.child.java.opts"));
+
+    } finally {
+      inputConfStream.close();
+    }
+  }
+
+  @Test
+  public void testTopologyBuilder() throws Exception {
+    final TopologyBuilder subject = new TopologyBuilder();
+
+    // currently we extract no host names from the Properties
+    subject.process(new Properties());
+
+    subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_0"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
+        "SUCCESS", null));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_1"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+        "MACHINE_EXPLODED"));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_2"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+        "MACHINE_EXPLODED"));
+    subject.process(new TaskStartedEvent(TaskID
+        .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
+        .valueOf("MAP"),
+        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com"));
+
+    final LoggedNetworkTopology topology = subject.build();
+
+    List<LoggedNetworkTopology> racks = topology.getChildren();
+
+    assertEquals("Wrong number of racks", 2, racks.size());
+
+    boolean sawSingleton = false;
+    boolean sawDoubleton = false;
+
+    for (LoggedNetworkTopology rack : racks) {
+      List<LoggedNetworkTopology> nodes = rack.getChildren();
+      if (rack.getName().endsWith(".64")) {
+        assertEquals("The singleton rack has the wrong number of elements", 1,
+            nodes.size());
+        sawSingleton = true;
+      } else if (rack.getName().endsWith(".80")) {
+        assertEquals("The doubleton rack has the wrong number of elements", 2,
+            nodes.size());
+        sawDoubleton = true;
+      } else {
+        assertTrue("Unrecognized rack name", false);
+      }
+    }
+
+    assertTrue("Did not see singleton rack", sawSingleton);
+    assertTrue("Did not see doubleton rack", sawDoubleton);
+  }
+
   static private <T extends DeepCompare> void jsonFileMatchesGold(
-      FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+      Configuration conf, Path result, Path gold, Class<? extends T> clazz,
       String fileDescription) throws IOException {
     JsonObjectMapperParser<T> goldParser =
-        new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
-    InputStream resultStream = lfs.open(result);
+        new JsonObjectMapperParser<T>(gold, clazz, conf);
     JsonObjectMapperParser<T> resultParser =
-        new JsonObjectMapperParser<T>(resultStream, clazz);
+        new JsonObjectMapperParser<T>(result, clazz, conf);
     try {
       while (true) {
         DeepCompare goldJob = goldParser.getNext();

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml (added)
+++ hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml Thu Feb 18 18:43:28 2010
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+   <property>
+      <name>mapred.job.queue.name</name><value>TheQueue</value>
+   </property>
+   <property>
+      <name>mapreduce.job.name</name><value>job_0001</value>
+   </property>
+   <property>
+      <name>maproduce.uninteresting.property</name><value>abcdef</value>
+   </property>
+   <property><name>mapred.child.java.opts</name><value>-server -Xmx640m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+
+/**
+ * {@link JobHistoryParser} that parses {@link JobHistory} files produced by
+ * {@link org.apache.hadoop.mapreduce.jobhistory.JobHistory} in the same source
+ * code tree as rumen.
+ */
+public class CurrentJHParser implements JobHistoryParser {
+  private EventReader reader;
+
+  private static class ForkedDataInputStream extends DataInputStream {
+    ForkedDataInputStream(InputStream input) {
+      super(input);
+    }
+
+    @Override
+    public void close() {
+      // no code
+    }
+  }
+
+  /**
+   * Can this parser parse the input?
+   * 
+   * @param input
+   * @return Whether this parser can parse the input.
+   * @throws IOException
+   */
+  public static boolean canParse(InputStream input) throws IOException {
+    final DataInputStream in = new ForkedDataInputStream(input);
+
+    try {
+      final EventReader reader = new EventReader(in);
+
+      try {
+        reader.getNextEvent();
+      } catch (IOException e) {
+        return false;
+      } finally {
+        reader.close();
+      }
+    } catch (IOException e) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public CurrentJHParser(InputStream input) throws IOException {
+    reader = new EventReader(new DataInputStream(input));
+  }
+
+  @Override
+  public HistoryEvent nextEvent() throws IOException {
+    return reader.getNextEvent();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link DefaultInputDemuxer} acts as a pass-through demuxer. It just opens
+ * each file and returns back the input stream. If the input is compressed, it
+ * would return a decompression stream.
+ */
+public class DefaultInputDemuxer implements InputDemuxer {
+  String name;
+  InputStream input;
+
+  @Override
+  public void bindTo(Path path, Configuration conf) throws IOException {
+    if (name != null) { // re-binding before the previous one was consumed.
+      close();
+    }
+    name = path.getName();
+
+    input = new PossiblyDecompressedInputStream(path, conf);
+
+    return;
+  }
+
+  @Override
+  public Pair<String, InputStream> getNext() throws IOException {
+    if (name != null) {
+      Pair<String, InputStream> ret =
+          new Pair<String, InputStream>(name, input);
+      name = null;
+      input = null;
+      return ret;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (input != null) {
+        input.close();
+      }
+    } finally {
+      name = null;
+      input = null;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * The default {@link Outputter} that outputs to a plain file. Compression
+ * will be applied if the path has the right suffix.
+ */
+public class DefaultOutputter<T> implements Outputter<T> {
+  JsonObjectMapperWriter<T> writer;
+  Compressor compressor;
+  
+  @Override
+  public void init(Path path, Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
+    OutputStream output;
+    if (codec != null) {
+      compressor = CodecPool.getCompressor(codec);
+      output = codec.createOutputStream(fs.create(path), compressor);
+    } else {
+      output = fs.create(path);
+    }
+    writer = new JsonObjectMapperWriter<T>(output, 
+        conf.getBoolean("rumen.output.pretty.print", true));
+  }
+
+  @Override
+  public void output(T object) throws IOException {
+    writer.write(object);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } finally {
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * {@link JobHistoryParser} to parse job histories for hadoop 0.20 (META=1).
+ */
+public class Hadoop20JHParser implements JobHistoryParser {
+  final LineReader reader;
+
+  static final String endLineString = " .";
+  static final int internalVersion = 1;
+
+  /**
+   * Can this parser parse the input?
+   * 
+   * @param input
+   * @return Whether this parser can parse the input.
+   * @throws IOException
+   * 
+   *           We will deem a stream to be a good 0.20 job history stream if the
+   *           first line is exactly "Meta VERSION=\"1\" ."
+   */
+  public static boolean canParse(InputStream input) throws IOException {
+    try {
+      LineReader reader = new LineReader(input);
+
+      Text buffer = new Text();
+
+      return reader.readLine(buffer) != 0
+          && buffer.toString().equals("Meta VERSION=\"1\" .");
+    } catch (EOFException e) {
+      return false;
+    }
+  }
+
+  public Hadoop20JHParser(InputStream input) throws IOException {
+    super();
+
+    reader = new LineReader(input);
+  }
+
+  Map<String, HistoryEventEmitter> liveEmitters =
+      new HashMap<String, HistoryEventEmitter>();
+  Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
+
+  enum LineType {
+    JOB("Job", "JOBID") {
+      HistoryEventEmitter createEmitter() {
+        return new Job20LineHistoryEventEmitter();
+      }
+    },
+
+    TASK("Task", "TASKID") {
+      HistoryEventEmitter createEmitter() {
+        return new Task20LineHistoryEventEmitter();
+      }
+    },
+
+    MAP_ATTEMPT("MapAttempt", "TASK_ATTEMPT_ID") {
+      HistoryEventEmitter createEmitter() {
+        return new MapAttempt20LineHistoryEventEmitter();
+      }
+    },
+
+    REDUCE_ATTEMPT("ReduceAttempt", "TASK_ATTEMPT_ID") {
+      HistoryEventEmitter createEmitter() {
+        return new ReduceAttempt20LineHistoryEventEmitter();
+      }
+    };
+
+    private LogRecordType type;
+    private String name;
+
+    LineType(String s, String name) {
+      type = LogRecordType.intern(s);
+      this.name = name;
+    }
+
+    LogRecordType recordType() {
+      return type;
+    }
+
+    String getName(ParsedLine line) {
+      return line.get(name);
+    }
+
+    abstract HistoryEventEmitter createEmitter();
+
+    static LineType findLineType(LogRecordType lrt) {
+      for (LineType lt : LineType.values()) {
+        if (lt.type == lrt) {
+          return lt;
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  public HistoryEvent nextEvent() {
+    try {
+      while (remainingEvents.isEmpty()) {
+        ParsedLine line = new ParsedLine(getFullLine(), internalVersion);
+        LineType type = LineType.findLineType(line.getType());
+        if (type == null) {
+          continue;
+        }
+        String name = type.getName(line);
+        HistoryEventEmitter emitter = findOrMakeEmitter(name, type);
+        Pair<Queue<HistoryEvent>, HistoryEventEmitter.PostEmitAction> pair =
+            emitter.emitterCore(line, name);
+        if (pair.second() == HistoryEventEmitter.PostEmitAction.REMOVE_HEE) {
+          liveEmitters.remove(name);
+        }
+        remainingEvents = pair.first();
+      }
+      return remainingEvents.poll();
+    } catch (EOFException e) {
+      return null;
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  HistoryEventEmitter findOrMakeEmitter(String name, LineType type) {
+    HistoryEventEmitter result = liveEmitters.get(name);
+    if (result == null) {
+      result = type.createEmitter();
+      liveEmitters.put(name, result);
+    }
+    return result;
+  }
+
+  private String getOneLine() throws IOException {
+    Text resultText = new Text();
+
+    if (reader.readLine(resultText) == 0) {
+      throw new EOFException("apparent bad line");
+    }
+
+    return resultText.toString();
+  }
+
+  private String getFullLine() throws IOException {
+    String line = getOneLine();
+
+    while (line.length() < endLineString.length()) {
+      line = getOneLine();
+    }
+
+    if (line.endsWith(endLineString)) {
+      return line;
+    }
+
+    StringBuilder sb = new StringBuilder(line);
+
+    String addedLine;
+
+    do {
+      addedLine = getOneLine();
+
+      if (addedLine == null) {
+        return sb.toString();
+      }
+
+      sb.append("\n");
+      sb.append(addedLine);
+    } while (addedLine.length() < endLineString.length()
+        || !endLineString.equals(addedLine.substring(addedLine.length()
+            - endLineString.length())));
+
+    return sb.toString();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Thu Feb 18 18:43:28 2010
@@ -70,6 +70,7 @@
  * about it. See {@code usage()}, below.
  * 
  */
+@Deprecated
 public class HadoopLogsAnalyzer extends Configured implements Tool {
 
   // output streams
@@ -103,14 +104,15 @@
   /**
    * The regular expression used to parse task attempt IDs in job tracker logs
    */
-  private final static Pattern taskAttemptIDPattern = Pattern
-      .compile(".*_([0-9]+)");
+  private final static Pattern taskAttemptIDPattern =
+      Pattern.compile(".*_([0-9]+)");
 
   private final static Pattern xmlFilePrefix = Pattern.compile("[ \t]*<");
 
   private final static Pattern confFileHeader = Pattern.compile("_conf.xml!!");
 
-  private final Map<String, Pattern> counterPatterns = new HashMap<String, Pattern>();
+  private final Map<String, Pattern> counterPatterns =
+      new HashMap<String, Pattern>();
 
   /**
    * The unpaired job config file. Currently only used to glean the {@code -Xmx}
@@ -188,14 +190,14 @@
   private boolean collectTaskTimes = false;
 
   private LogRecordType canonicalJob = LogRecordType.intern("Job");
-  private LogRecordType canonicalMapAttempt = LogRecordType
-      .intern("MapAttempt");
-  private LogRecordType canonicalReduceAttempt = LogRecordType
-      .intern("ReduceAttempt");
+  private LogRecordType canonicalMapAttempt =
+      LogRecordType.intern("MapAttempt");
+  private LogRecordType canonicalReduceAttempt =
+      LogRecordType.intern("ReduceAttempt");
   private LogRecordType canonicalTask = LogRecordType.intern("Task");
 
-  private static Pattern streamingJobnamePattern = Pattern
-      .compile("streamjob\\d+.jar");
+  private static Pattern streamingJobnamePattern =
+      Pattern.compile("streamjob\\d+.jar");
 
   private HashSet<String> hostNames = new HashSet<String>();
 
@@ -250,8 +252,8 @@
       result[i] = new Histogram[LoggedJob.JobType.values().length];
 
       for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
-        result[i][j] = blockname == null ? new Histogram() : new Histogram(
-            blockname);
+        result[i][j] =
+            blockname == null ? new Histogram() : new Histogram(blockname);
       }
     }
 
@@ -505,8 +507,9 @@
           SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
       JsonFactory jfactory = jmapper.getJsonFactory();
       FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
-      jobTraceGen = jfactory.createJsonGenerator(
-          jobFS.create(jobTraceFilename), JsonEncoding.UTF8);
+      jobTraceGen =
+          jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
+              JsonEncoding.UTF8);
       if (prettyprintTrace) {
         jobTraceGen.useDefaultPrettyPrinter();
       }
@@ -517,8 +520,9 @@
             SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
         JsonFactory tfactory = tmapper.getJsonFactory();
         FileSystem topoFS = topologyFilename.getFileSystem(getConf());
-        topologyGen = tfactory.createJsonGenerator(
-            topoFS.create(topologyFilename), JsonEncoding.UTF8);
+        topologyGen =
+            tfactory.createJsonGenerator(topoFS.create(topologyFilename),
+                JsonEncoding.UTF8);
         topologyGen.useDefaultPrettyPrinter();
       }
     }
@@ -546,7 +550,7 @@
       IOException {
     if (input != null) {
       input.close();
-      LOG.info("File closed: "+currentFileName);
+      LOG.info("File closed: " + currentFileName);
       input = null;
     }
 
@@ -573,7 +577,8 @@
             + inputDirectoryFiles.length + ", starts with line " + lineNumber
             + ".");
 
-    input = maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
+    input =
+        maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
 
     return input != null;
   }
@@ -734,8 +739,9 @@
 
     long[] endpointKeys = taskTimes.getCDF(1000, endpoints);
 
-    int smallResultOffset = (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
-        : 0);
+    int smallResultOffset =
+        (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
+            : 0);
 
     Histogram myTotal = spreadTo[outcome.ordinal()][jtype.ordinal()];
 
@@ -803,14 +809,15 @@
           attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
 
           // initialize all the per-job statistics gathering places
-          successfulMapAttemptTimes = new Histogram[ParsedHost
-              .numberOfDistances() + 1];
+          successfulMapAttemptTimes =
+              new Histogram[ParsedHost.numberOfDistances() + 1];
           for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
             successfulMapAttemptTimes[i] = new Histogram();
           }
 
           successfulReduceAttemptTimes = new Histogram();
-          failedMapAttemptTimes = new Histogram[ParsedHost.numberOfDistances() + 1];
+          failedMapAttemptTimes =
+              new Histogram[ParsedHost.numberOfDistances() + 1];
           for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
             failedMapAttemptTimes[i] = new Histogram();
           }
@@ -851,7 +858,8 @@
           if (finishTime != null) {
             jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
             if (status != null) {
-              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
+              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
+                  .valueOf(status));
             }
 
             maybeMateJobAndConf();
@@ -890,9 +898,9 @@
           if (launchTimeCurrentJob != 0) {
             String jobResultText = line.get("JOB_STATUS");
 
-            JobOutcome thisOutcome = ((jobResultText != null && "SUCCESS"
-                .equals(jobResultText)) ? JobOutcome.SUCCESS
-                : JobOutcome.FAILURE);
+            JobOutcome thisOutcome =
+                ((jobResultText != null && "SUCCESS".equals(jobResultText))
+                    ? JobOutcome.SUCCESS : JobOutcome.FAILURE);
 
             if (submitTimeCurrentJob != 0L) {
               canonicalDistributionsEnter(delayTimeDists, thisOutcome,
@@ -911,8 +919,8 @@
             Histogram currentJobSortTimes = new Histogram();
             Histogram currentJobReduceTimes = new Histogram();
 
-            Iterator<Map.Entry<String, Long>> taskIter = taskAttemptStartTimes
-                .entrySet().iterator();
+            Iterator<Map.Entry<String, Long>> taskIter =
+                taskAttemptStartTimes.entrySet().iterator();
 
             while (taskIter.hasNext()) {
               Map.Entry<String, Long> entry = taskIter.next();
@@ -930,8 +938,8 @@
               }
 
               // Reduce processing
-              Long shuffleEnd = taskReduceAttemptShuffleEndTimes.get(entry
-                  .getKey());
+              Long shuffleEnd =
+                  taskReduceAttemptShuffleEndTimes.get(entry.getKey());
               Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
               Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
 
@@ -1027,7 +1035,9 @@
       Pre21JobHistoryConstants.Values stat;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A task status you don't know about is \"" + status + "\".",
             e);
@@ -1037,22 +1047,26 @@
       task.setTaskStatus(stat);
 
       try {
-        typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
+        typ =
+            taskType == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(taskType);
       } catch (IllegalArgumentException e) {
         LOG.error("A task type you don't know about is \"" + taskType + "\".",
             e);
         typ = null;
       }
-      
+
       if (typ == null) {
         return;
       }
 
       task.setTaskType(typ);
 
-      List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
-          .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
-          .getReduceTasks() : jobBeingTraced.getOtherTasks();
+      List<LoggedTask> vec =
+          typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+              .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE
+              ? jobBeingTraced.getReduceTasks() : jobBeingTraced
+                  .getOtherTasks();
 
       if (!taskAlreadyLogged) {
         vec.add(task);
@@ -1066,8 +1080,8 @@
     Pattern result = counterPatterns.get(counterName);
 
     if (result == null) {
-      String namePatternRegex = "\\[\\(" + counterName
-          + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
+      String namePatternRegex =
+          "\\[\\(" + counterName + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
       result = Pattern.compile(namePatternRegex);
       counterPatterns.put(counterName, result);
     }
@@ -1253,7 +1267,9 @@
       Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1404,7 +1420,9 @@
       Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.warn("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1632,8 +1650,8 @@
         }
 
         for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
-          successAfterI[ent.getKey().intValue()] = ((double) ent.getValue())
-              / totalSuccessfulAttempts;
+          successAfterI[ent.getKey().intValue()] =
+              ((double) ent.getValue()) / totalSuccessfulAttempts;
         }
         jobBeingTraced.setMapperTriesToSucceed(successAfterI);
       } else {
@@ -1712,8 +1730,9 @@
       }
 
       if (spreading) {
-        String ratioDescription = "(" + spreadMax + "/1000 %ile) to ("
-            + spreadMin + "/1000 %ile) scaled by 1000000";
+        String ratioDescription =
+            "(" + spreadMax + "/1000 %ile) to (" + spreadMin
+                + "/1000 %ile) scaled by 1000000";
 
         printDistributionSet(
             "Map task success times " + ratioDescription + ":",
@@ -1737,8 +1756,8 @@
     }
 
     if (topologyGen != null) {
-      LoggedNetworkTopology topo = new LoggedNetworkTopology(allHosts,
-          "<root>", 0);
+      LoggedNetworkTopology topo =
+          new LoggedNetworkTopology(allHosts, "<root>", 0);
       topologyGen.writeObject(topo);
       topologyGen.close();
     }

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+
+abstract class HistoryEventEmitter {
+  static final private Log LOG = LogFactory.getLog(HistoryEventEmitter.class);
+
+  abstract List<SingleEventEmitter> nonFinalSEEs();
+
+  abstract List<SingleEventEmitter> finalSEEs();
+
+  protected HistoryEventEmitter() {
+    // no code
+  }
+
+  enum PostEmitAction {
+    NONE, REMOVE_HEE
+  };
+
+  final Pair<Queue<HistoryEvent>, PostEmitAction> emitterCore(ParsedLine line,
+      String name) {
+    Queue<HistoryEvent> results = new LinkedList<HistoryEvent>();
+    PostEmitAction removeEmitter = PostEmitAction.NONE;
+    for (SingleEventEmitter see : nonFinalSEEs()) {
+      HistoryEvent event = see.maybeEmitEvent(line, name, this);
+      if (event != null) {
+        results.add(event);
+      }
+    }
+    for (SingleEventEmitter see : finalSEEs()) {
+      HistoryEvent event = see.maybeEmitEvent(line, name, this);
+      if (event != null) {
+        results.add(event);
+        removeEmitter = PostEmitAction.REMOVE_HEE;
+        break;
+      }
+    }
+    return new Pair<Queue<HistoryEvent>, PostEmitAction>(results, removeEmitter);
+  }
+
+  protected static Counters parseCounters(String counters)
+      throws ParseException {
+    if (counters == null) {
+      LOG.warn("HistoryEventEmitters: null counter detected:");
+      return null;
+    }
+
+    counters = counters.replace("\\.", "\\\\.");
+    counters = counters.replace("\\\\(", "\\(");
+    counters = counters.replace("\\\\)", "\\)");
+
+    org.apache.hadoop.mapred.Counters depForm =
+        org.apache.hadoop.mapred.Counters.fromEscapedCompactString(counters);
+
+    return new Counters(depForm);
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link InputDemuxer} dem-ultiplexes the input files into individual input
+ * streams.
+ */
+public interface InputDemuxer extends Closeable {
+  /**
+   * Bind the {@link InputDemuxer} to a particular file.
+   * 
+   * @param path
+   *          The path to the find it should bind to.
+   * @param conf
+   *          Configuration
+   * @throws IOException
+   * 
+   *           Returns true when the binding succeeds. If the file can be read
+   *           but is in the wrong format, returns false. IOException is
+   *           reserved for read errors.
+   */
+  public void bindTo(Path path, Configuration conf) throws IOException;
+
+  /**
+   * Get the next <name, input> pair. The name should preserve the original job
+   * history file or job conf file name. The input object should be closed
+   * before calling getNext() again. The old input object would be invalid after
+   * calling getNext() again.
+   * 
+   * @return the next <name, input> pair.
+   */
+  public Pair<String, InputStream> getNext() throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+
+public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  Long originalSubmitTime = null;
+
+  static {
+    nonFinals.add(new JobSubmittedEventEmitter());
+    nonFinals.add(new JobPriorityChangeEventEmitter());
+    nonFinals.add(new JobStatusChangedEventEmitter());
+    nonFinals.add(new JobInitedEventEmitter());
+    nonFinals.add(new JobInfoChangeEventEmitter());
+
+    finals.add(new JobUnsuccessfulCompletionEventEmitter());
+    finals.add(new JobFinishedEventEmitter());
+  }
+
+  Job20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class JobSubmittedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      JobID jobID = JobID.forName(jobIDName);
+
+      if (jobIDName == null) {
+        return null;
+      }
+
+      String submitTime = line.get("SUBMIT_TIME");
+      String jobConf = line.get("JOBCONF");
+      String user = line.get("USER");
+      String jobName = line.get("JOBNAME");
+
+      if (submitTime != null) {
+        Job20LineHistoryEventEmitter that =
+            (Job20LineHistoryEventEmitter) thatg;
+
+        that.originalSubmitTime = Long.parseLong(submitTime);
+
+        return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
+            : user, that.originalSubmitTime, jobConf);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobPriorityChangeEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      JobID jobID = JobID.forName(jobIDName);
+
+      if (jobIDName == null) {
+        return null;
+      }
+
+      String priority = line.get("JOB_PRIORITY");
+
+      if (priority != null) {
+        return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobInitedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String launchTime = line.get("LAUNCH_TIME");
+      String status = line.get("JOB_STATUS");
+      String totalMaps = line.get("TOTAL_MAPS");
+      String totalReduces = line.get("TOTAL_REDUCES");
+
+      if (launchTime != null && totalMaps != null && totalReduces != null) {
+        return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
+            .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobStatusChangedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String status = line.get("JOB_STATUS");
+
+      if (status != null) {
+        return new JobStatusChangedEvent(jobID, status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobInfoChangeEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String launchTime = line.get("LAUNCH_TIME");
+
+      if (launchTime != null) {
+        Job20LineHistoryEventEmitter that =
+            (Job20LineHistoryEventEmitter) thatg;
+        return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
+            .parseLong(launchTime));
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobUnsuccessfulCompletionEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      String status = line.get("JOB_STATUS");
+
+      String finishedMaps = line.get("FINISHED_MAPS");
+      String finishedReduces = line.get("FINISHED_REDUCES");
+
+      if (status != null && !status.equalsIgnoreCase("success")
+          && finishTime != null && finishedMaps != null
+          && finishedReduces != null) {
+        return new JobUnsuccessfulCompletionEvent(jobID, Long
+            .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
+            .parseInt(finishedReduces), status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobFinishedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      String status = line.get("JOB_STATUS");
+
+      String finishedMaps = line.get("FINISHED_MAPS");
+      String finishedReduces = line.get("FINISHED_REDUCES");
+
+      String failedMaps = line.get("FAILED_MAPS");
+      String failedReduces = line.get("FAILED_REDUCES");
+
+      String counters = line.get("COUNTERS");
+
+      if (status != null && status.equalsIgnoreCase("success")
+          && finishTime != null && finishedMaps != null
+          && finishedReduces != null) {
+        try {
+          return new JobFinishedEvent(jobID, Long.parseLong(finishTime),
+              Integer.parseInt(finishedMaps),
+              Integer.parseInt(finishedReduces), Integer.parseInt(failedMaps),
+              Integer.parseInt(failedReduces), null, null,
+              parseCounters(counters));
+        } catch (ParseException e) {
+          return null;
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}