You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/02/18 19:43:30 UTC
svn commit: r911519 [1/3] - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/jobhistory/
src/test/mapred/org/apache/hadoop/tools/rumen/
src/test/tools/data/rumen/small-trace-test/
src/tools/org/apache/hadoop/tools/rumen/
Author: cdouglas
Date: Thu Feb 18 18:43:28 2010
New Revision: 911519
URL: http://svn.apache.org/viewvc?rev=911519&view=rev
Log:
MAPREDUCE-1309. Refactor Rumen trace generator to improve code structure
and add extensible support for log formats. Contributed by Dick King
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz (with props)
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz (with props)
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz (with props)
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz (with props)
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz (with props)
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Feb 18 18:43:28 2010
@@ -177,6 +177,9 @@
MAPREDUCE-1445. Refactor Sqoop tests to support better ConnManager testing.
(Aaron Kimball via tomwhite)
+ MAPREDUCE-1309. Refactor Rumen trace generator to improve code structure
+ and add extensible support for log formats. (Dick King via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Thu Feb 18 18:43:28 2010
@@ -64,8 +64,9 @@
this.in = in;
this.version = in.readLine();
- if (!EventWriter.VERSION.equals(version))
+ if (!EventWriter.VERSION.equals(version)) {
throw new IOException("Incompatible event log version: "+version);
+ }
this.schema = Schema.parse(in.readLine());
this.reader = new SpecificDatumReader(schema);
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class ConcatenatedInputFilesDemuxer implements InputDemuxer {
+ private String name;
+ private DelimitedInputStream input;
+
+ private String knownNextFileName = null;
+
+ static private int MAXIMUM_HEADER_LINE_LENGTH = 500;
+
+ @Override
+ public void bindTo(Path path, Configuration conf) throws IOException {
+ InputStream underlyingInput = null;
+
+ if (name != null) { // re-binding before the previous one was consumed.
+ close();
+ }
+ name = path.getName();
+
+ underlyingInput = new PossiblyDecompressedInputStream(path, conf);
+
+ input =
+ new DelimitedInputStream(new BufferedInputStream(underlyingInput),
+ "\f!!FILE=", "!!\n");
+
+ knownNextFileName = input.nextFileName();
+
+ if (knownNextFileName == null) {
+ close();
+
+ return;
+ }
+
+ /*
+ * We handle files in specialized formats by trying their demuxers first,
+ * not by failing here.
+ */
+ return;
+ }
+
+ @Override
+ public Pair<String, InputStream> getNext() throws IOException {
+ if (knownNextFileName != null) {
+ Pair<String, InputStream> result =
+ new Pair<String, InputStream>(knownNextFileName, input);
+
+ knownNextFileName = null;
+
+ return result;
+ }
+
+ String nextFileName = input.nextFileName();
+
+ if (nextFileName == null) {
+ return null;
+ }
+
+ return new Pair<String, InputStream>(nextFileName, input);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (input != null) {
+ input.close();
+ }
+ }
+
+ /**
+ * A simple wrapper class to make any input stream delimited. It has an extra
+ * method, getName.
+ *
+ * The input stream should have lines that look like
+ * <marker><filename><endmarker> . The text <marker> should not occur
+ * elsewhere in the file. The text <endmarker> should not occur in a file
+ * name.
+ */
+ static class DelimitedInputStream extends InputStream {
+ private InputStream input;
+
+ private boolean endSeen = false;
+
+ private final String fileMarker;
+
+ private final byte[] markerBytes;
+
+ private final byte[] fileMarkerBuffer;
+
+ private final String fileEndMarker;
+
+ private final byte[] endMarkerBytes;
+
+ private final byte[] fileEndMarkerBuffer;
+
+ /**
+ * Constructor.
+ *
+ * @param input
+ */
+ public DelimitedInputStream(InputStream input, String fileMarker,
+ String fileEndMarker) {
+ this.input = new BufferedInputStream(input, 10000);
+ this.input.mark(10000);
+ this.fileMarker = fileMarker;
+ this.markerBytes = this.fileMarker.getBytes();
+ this.fileMarkerBuffer = new byte[this.markerBytes.length];
+ this.fileEndMarker = fileEndMarker;
+ this.endMarkerBytes = this.fileEndMarker.getBytes();
+ this.fileEndMarkerBuffer = new byte[this.endMarkerBytes.length];
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (endSeen) {
+ return -1;
+ }
+
+ input.mark(10000);
+
+ int result = input.read();
+
+ if (result < 0) {
+ endSeen = true;
+ return result;
+ }
+
+ if (result == markerBytes[0]) {
+ input.reset();
+
+ // this might be a marker line
+ int markerReadResult =
+ input.read(fileMarkerBuffer, 0, fileMarkerBuffer.length);
+
+ input.reset();
+
+ if (markerReadResult < fileMarkerBuffer.length
+ || !fileMarker.equals(new String(fileMarkerBuffer))) {
+ return input.read();
+ }
+
+ return -1;
+ }
+
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#read(byte[], int, int)
+ *
+ * This does SLIGHTLY THE WRONG THING.
+ *
+ * If we run off the end of the segment then the input buffer will be
+ * dirtied beyond the point where we claim to have read. If this turns out
+ * to be a problem, save that data somewhere and restore it if needed.
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (endSeen) {
+ return -1;
+ }
+
+ input.mark(length + markerBytes.length + 10);
+
+ int dataSeen = input.read(buffer, offset, length);
+
+ byte[] extraReadBuffer = null;
+ int extraActualRead = -1;
+
+ // search for an instance of a file marker
+ for (int i = offset; i < offset + dataSeen; ++i) {
+ if (buffer[i] == markerBytes[0]) {
+ boolean mismatch = false;
+
+ for (int j = 1; j < Math.min(markerBytes.length, offset + dataSeen
+ - i); ++j) {
+ if (buffer[i + j] != markerBytes[j]) {
+ mismatch = true;
+ break;
+ }
+ }
+
+ if (!mismatch) {
+ // see if we have only a prefix of the markerBytes
+ int uncheckedMarkerCharCount =
+ markerBytes.length - (offset + dataSeen - i);
+
+ if (uncheckedMarkerCharCount > 0) {
+ if (extraReadBuffer == null) {
+ extraReadBuffer = new byte[markerBytes.length - 1];
+
+ extraActualRead = input.read(extraReadBuffer);
+ }
+
+ if (extraActualRead < uncheckedMarkerCharCount) {
+ input.reset();
+ return input.read(buffer, offset, length);
+ }
+
+ for (int j = 0; j < uncheckedMarkerCharCount; ++j) {
+ if (extraReadBuffer[j] != markerBytes[markerBytes.length
+ - uncheckedMarkerCharCount + j]) {
+ input.reset();
+ return input.read(buffer, offset, length);
+ }
+ }
+ }
+
+ input.reset();
+
+ if (i == offset) {
+ return -1;
+ }
+
+ int result = input.read(buffer, offset, i - offset);
+ return result;
+ }
+ }
+ }
+
+ return dataSeen;
+ }
+
+ @Override
+ public int read(byte[] buffer) throws IOException {
+ return read(buffer, 0, buffer.length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (endSeen) {
+ input.close();
+ }
+ }
+
+ String nextFileName() throws IOException {
+ return nextFileName(MAXIMUM_HEADER_LINE_LENGTH);
+ }
+
+ private String nextFileName(int bufferSize) throws IOException {
+ // the line can't contain a newline and must contain a form feed
+ byte[] buffer = new byte[bufferSize];
+
+ input.mark(bufferSize + 1);
+
+ int actualRead = input.read(buffer);
+ int mostRecentRead = actualRead;
+
+ while (actualRead < bufferSize && mostRecentRead > 0) {
+ mostRecentRead =
+ input.read(buffer, actualRead, bufferSize - actualRead);
+
+ if (mostRecentRead > 0) {
+ actualRead += mostRecentRead;
+ }
+ }
+
+ if (actualRead < markerBytes.length) {
+ input.reset();
+ return null;
+ }
+
+ for (int i = 0; i < markerBytes.length; ++i) {
+ if (markerBytes[i] != buffer[i]) {
+ input.reset();
+ return null;
+ }
+ }
+
+ for (int i = markerBytes.length; i < actualRead; ++i) {
+ if (buffer[i] == endMarkerBytes[0]) {
+ // gather the file name
+ input.reset();
+ // burn the marker
+ if (input.read(buffer, 0, markerBytes.length) < markerBytes.length) {
+ throw new IOException("Can't reread bytes I've read before.");
+ }
+ // get the file name
+ if (input.read(buffer, 0, i - markerBytes.length) < i
+ - markerBytes.length) {
+ throw new IOException("Can't reread bytes I've read before.");
+ }
+ // burn the two exclamation points and the newline
+ if (input.read(fileEndMarkerBuffer) < fileEndMarkerBuffer.length) {
+ input.reset();
+ return null;
+ }
+ for (int j = 0; j < endMarkerBytes.length; ++j) {
+ if (endMarkerBytes[j] != fileEndMarkerBuffer[j]) {
+ input.reset();
+ return null;
+ }
+ }
+
+ return new String(buffer, 0, i - markerBytes.length);
+ }
+
+ if (buffer[i] == '\n') {
+ return null;
+ }
+ }
+
+ // we ran off the end. Was the buffer too short, or is this all there was?
+ input.reset();
+
+ if (actualRead < bufferSize) {
+ return null;
+ }
+
+ return nextFileName(bufferSize * 2);
+ }
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Thu Feb 18 18:43:28 2010
@@ -18,13 +18,33 @@
package org.apache.hadoop.tools.rumen;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
@@ -49,11 +69,11 @@
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir =
- new Path(System.getProperty("test.tools.input.dir", ""))
- .makeQualified(lfs);
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
final Path rootTempDir =
- new Path(System.getProperty("test.build.data", "/tmp"))
- .makeQualified(lfs);
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
@@ -82,25 +102,317 @@
final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
final Path traceGoldFile = new Path(rootInputFile, goldTrace);
+ @SuppressWarnings("deprecation")
HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
TestRumenJobTraces
- .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+ .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyFile,
+ topologyGoldFile, LoggedNetworkTopology.class, "topology");
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, traceFile,
+ traceGoldFile, LoggedJob.class, "trace");
+ }
+
+ @Test
+ public void testRumenViaDispatch() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+ lfs.delete(tempDir, true);
+
+ final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+ final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+ final Path inputPath =
+ new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz");
+
+ System.out.println("topology result file = " + topologyPath);
+ System.out.println("trace result file = " + tracePath);
+
+ String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName();
+
+ String[] args =
+ { "-demuxer", demuxerClassName, tracePath.toString(),
+ topologyPath.toString(), inputPath.toString() };
+
+ final Path topologyGoldFile =
+ new Path(rootInputPath, "dispatch-topology-output.json.gz");
+ final Path traceGoldFile =
+ new Path(rootInputPath, "dispatch-trace-output.json.gz");
+
+ Tool analyzer = new TraceBuilder();
+ int result = ToolRunner.run(analyzer, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ TestRumenJobTraces
+ .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyPath,
topologyGoldFile, LoggedNetworkTopology.class, "topology");
- TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
traceGoldFile, LoggedJob.class, "trace");
}
+ @Test
+ public void testHadoop20JHParser() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ boolean success = false;
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+ lfs.delete(tempDir, true);
+
+ final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
+ final Path goldPath =
+ new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
+
+ InputStream inputLogStream =
+ new PossiblyDecompressedInputStream(inputPath, conf);
+
+ InputStream inputGoldStream =
+ new PossiblyDecompressedInputStream(goldPath, conf);
+
+ BufferedInputStream bis = new BufferedInputStream(inputLogStream);
+ bis.mark(10000);
+ Hadoop20JHParser parser = new Hadoop20JHParser(bis);
+
+ final Path resultPath = new Path(tempDir, "result.text");
+
+ System.out.println("testHadoop20JHParser sent its output to " + resultPath);
+
+ Compressor compressor;
+
+ FileSystem fs = resultPath.getFileSystem(conf);
+ CompressionCodec codec =
+ new CompressionCodecFactory(conf).getCodec(resultPath);
+ OutputStream output;
+ if (codec != null) {
+ compressor = CodecPool.getCompressor(codec);
+ output = codec.createOutputStream(fs.create(resultPath), compressor);
+ } else {
+ output = fs.create(resultPath);
+ }
+
+ PrintStream printStream = new PrintStream(output);
+
+ try {
+ assertEquals("Hadoop20JHParser can't parse the test file", true,
+ Hadoop20JHParser.canParse(inputLogStream));
+
+ bis.reset();
+
+ HistoryEvent event = parser.nextEvent();
+
+ while (event != null) {
+ printStream.println(event.getClass().getCanonicalName());
+ event = parser.nextEvent();
+ }
+
+ printStream.close();
+
+ LineReader goldLines = new LineReader(inputGoldStream);
+ LineReader resultLines =
+ new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
+
+ int lineNumber = 1;
+
+ try {
+ Text goldLine = new Text();
+ Text resultLine = new Text();
+
+ int goldRead = goldLines.readLine(goldLine);
+ int resultRead = resultLines.readLine(resultLine);
+
+ while (goldRead * resultRead != 0) {
+ if (!goldLine.equals(resultLine)) {
+ assertEquals("Type mismatch detected", goldLine, resultLine);
+ break;
+ }
+
+ goldRead = goldLines.readLine(goldLine);
+ resultRead = resultLines.readLine(resultLine);
+
+ ++lineNumber;
+ }
+
+ if (goldRead != resultRead) {
+ assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
+ + " file contains more text at line " + lineNumber, goldRead,
+ resultRead);
+ }
+
+ success = true;
+ } finally {
+ goldLines.close();
+ resultLines.close();
+
+ if (success) {
+ lfs.delete(resultPath, false);
+ }
+ }
+
+ } finally {
+ if (parser == null) {
+ inputLogStream.close();
+ } else {
+ if (parser != null) {
+ parser.close();
+ }
+ }
+
+ if (inputGoldStream != null) {
+ inputGoldStream.close();
+ }
+
+ // it's okay to do this twice [if we get an error on input]
+ printStream.close();
+ }
+ }
+
+ @Test
+ public void testJobConfigurationParser() throws Exception {
+ String[] list1 =
+ { "mapred.job.queue.name", "mapreduce.job.name",
+ "mapred.child.java.opts" };
+
+ String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
+
+ List<String> interested1 = new ArrayList<String>();
+ for (String interested : list1) {
+ interested1.add(interested);
+ }
+
+ List<String> interested2 = new ArrayList<String>();
+ for (String interested : list2) {
+ interested2.add(interested);
+ }
+
+ JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
+ JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
+
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ @SuppressWarnings("deprecation")
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs);
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+
+ final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
+
+ InputStream inputConfStream =
+ new PossiblyDecompressedInputStream(inputPath, conf);
+
+ try {
+ Properties props1 = jcp1.parse(inputConfStream);
+ inputConfStream.close();
+
+ inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
+ Properties props2 = jcp2.parse(inputConfStream);
+
+ assertEquals("testJobConfigurationParser: wrong number of properties", 3,
+ props1.size());
+ assertEquals("testJobConfigurationParser: wrong number of properties", 2,
+ props2.size());
+
+ assertEquals("prop test 1", "TheQueue", props1
+ .get("mapred.job.queue.name"));
+ assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
+ assertEquals("prop test 3",
+ "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
+ .get("mapred.child.java.opts"));
+ assertEquals("prop test 4", "TheQueue", props2
+ .get("mapred.job.queue.name"));
+ assertEquals("prop test 5",
+ "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
+ .get("mapred.child.java.opts"));
+
+ } finally {
+ inputConfStream.close();
+ }
+ }
+
+ @Test
+ public void testTopologyBuilder() throws Exception {
+ final TopologyBuilder subject = new TopologyBuilder();
+
+ // currently we extract no host names from the Properties
+ subject.process(new Properties());
+
+ subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_0"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
+ "SUCCESS", null));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_1"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED"));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_2"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED"));
+ subject.process(new TaskStartedEvent(TaskID
+ .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
+ .valueOf("MAP"),
+ "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com"));
+
+ final LoggedNetworkTopology topology = subject.build();
+
+ List<LoggedNetworkTopology> racks = topology.getChildren();
+
+ assertEquals("Wrong number of racks", 2, racks.size());
+
+ boolean sawSingleton = false;
+ boolean sawDoubleton = false;
+
+ for (LoggedNetworkTopology rack : racks) {
+ List<LoggedNetworkTopology> nodes = rack.getChildren();
+ if (rack.getName().endsWith(".64")) {
+ assertEquals("The singleton rack has the wrong number of elements", 1,
+ nodes.size());
+ sawSingleton = true;
+ } else if (rack.getName().endsWith(".80")) {
+ assertEquals("The doubleton rack has the wrong number of elements", 2,
+ nodes.size());
+ sawDoubleton = true;
+ } else {
+ assertTrue("Unrecognized rack name", false);
+ }
+ }
+
+ assertTrue("Did not see singleton rack", sawSingleton);
+ assertTrue("Did not see doubleton rack", sawDoubleton);
+ }
+
static private <T extends DeepCompare> void jsonFileMatchesGold(
- FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+ Configuration conf, Path result, Path gold, Class<? extends T> clazz,
String fileDescription) throws IOException {
JsonObjectMapperParser<T> goldParser =
- new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
- InputStream resultStream = lfs.open(result);
+ new JsonObjectMapperParser<T>(gold, clazz, conf);
JsonObjectMapperParser<T> resultParser =
- new JsonObjectMapperParser<T>(resultStream, clazz);
+ new JsonObjectMapperParser<T>(result, clazz, conf);
try {
while (true) {
DeepCompare goldJob = goldParser.getNext();
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml (added)
+++ hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml Thu Feb 18 18:43:28 2010
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>mapred.job.queue.name</name><value>TheQueue</value>
+ </property>
+ <property>
+ <name>mapreduce.job.name</name><value>job_0001</value>
+ </property>
+ <property>
+ <name>maproduce.uninteresting.property</name><value>abcdef</value>
+ </property>
+ <property><name>mapred.child.java.opts</name><value>-server -Xmx640m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz?rev=911519&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+
+/**
+ * {@link JobHistoryParser} that parses {@link JobHistory} files produced by
+ * {@link org.apache.hadoop.mapreduce.jobhistory.JobHistory} in the same source
+ * code tree as rumen.
+ */
+public class CurrentJHParser implements JobHistoryParser {
+ private EventReader reader;
+
+ private static class ForkedDataInputStream extends DataInputStream {
+ ForkedDataInputStream(InputStream input) {
+ super(input);
+ }
+
+ @Override
+ public void close() {
+ // no code
+ }
+ }
+
+ /**
+ * Can this parser parse the input?
+ *
+ * @param input
+ * @return Whether this parser can parse the input.
+ * @throws IOException
+ */
+ public static boolean canParse(InputStream input) throws IOException {
+ final DataInputStream in = new ForkedDataInputStream(input);
+
+ try {
+ final EventReader reader = new EventReader(in);
+
+ try {
+ reader.getNextEvent();
+ } catch (IOException e) {
+ return false;
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public CurrentJHParser(InputStream input) throws IOException {
+ reader = new EventReader(new DataInputStream(input));
+ }
+
+ @Override
+ public HistoryEvent nextEvent() throws IOException {
+ return reader.getNextEvent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link DefaultInputDemuxer} acts as a pass-through demuxer. It just opens
+ * each file and returns back the input stream. If the input is compressed, it
+ * would return a decompression stream.
+ */
+public class DefaultInputDemuxer implements InputDemuxer {
+ String name;
+ InputStream input;
+
+ @Override
+ public void bindTo(Path path, Configuration conf) throws IOException {
+ if (name != null) { // re-binding before the previous one was consumed.
+ close();
+ }
+ name = path.getName();
+
+ input = new PossiblyDecompressedInputStream(path, conf);
+
+ return;
+ }
+
+ @Override
+ public Pair<String, InputStream> getNext() throws IOException {
+ if (name != null) {
+ Pair<String, InputStream> ret =
+ new Pair<String, InputStream>(name, input);
+ name = null;
+ input = null;
+ return ret;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (input != null) {
+ input.close();
+ }
+ } finally {
+ name = null;
+ input = null;
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * The default {@link Outputter} that outputs to a plain file. Compression
+ * will be applied if the path has the right suffix.
+ */
+public class DefaultOutputter<T> implements Outputter<T> {
+ JsonObjectMapperWriter<T> writer;
+ Compressor compressor;
+
+ @Override
+ public void init(Path path, Configuration conf) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
+ OutputStream output;
+ if (codec != null) {
+ compressor = CodecPool.getCompressor(codec);
+ output = codec.createOutputStream(fs.create(path), compressor);
+ } else {
+ output = fs.create(path);
+ }
+ writer = new JsonObjectMapperWriter<T>(output,
+ conf.getBoolean("rumen.output.pretty.print", true));
+ }
+
+ @Override
+ public void output(T object) throws IOException {
+ writer.write(object);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ writer.close();
+ } finally {
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * {@link JobHistoryParser} to parse job histories for hadoop 0.20 (META=1).
+ */
+public class Hadoop20JHParser implements JobHistoryParser {
+ final LineReader reader;
+
+ static final String endLineString = " .";
+ static final int internalVersion = 1;
+
+ /**
+ * Can this parser parse the input?
+ *
+ * @param input
+ * @return Whether this parser can parse the input.
+ * @throws IOException
+ *
+ * We will deem a stream to be a good 0.20 job history stream if the
+ * first line is exactly "Meta VERSION=\"1\" ."
+ */
+ public static boolean canParse(InputStream input) throws IOException {
+ try {
+ LineReader reader = new LineReader(input);
+
+ Text buffer = new Text();
+
+ return reader.readLine(buffer) != 0
+ && buffer.toString().equals("Meta VERSION=\"1\" .");
+ } catch (EOFException e) {
+ return false;
+ }
+ }
+
+ public Hadoop20JHParser(InputStream input) throws IOException {
+ super();
+
+ reader = new LineReader(input);
+ }
+
+ Map<String, HistoryEventEmitter> liveEmitters =
+ new HashMap<String, HistoryEventEmitter>();
+ Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
+
+ enum LineType {
+ JOB("Job", "JOBID") {
+ HistoryEventEmitter createEmitter() {
+ return new Job20LineHistoryEventEmitter();
+ }
+ },
+
+ TASK("Task", "TASKID") {
+ HistoryEventEmitter createEmitter() {
+ return new Task20LineHistoryEventEmitter();
+ }
+ },
+
+ MAP_ATTEMPT("MapAttempt", "TASK_ATTEMPT_ID") {
+ HistoryEventEmitter createEmitter() {
+ return new MapAttempt20LineHistoryEventEmitter();
+ }
+ },
+
+ REDUCE_ATTEMPT("ReduceAttempt", "TASK_ATTEMPT_ID") {
+ HistoryEventEmitter createEmitter() {
+ return new ReduceAttempt20LineHistoryEventEmitter();
+ }
+ };
+
+ private LogRecordType type;
+ private String name;
+
+ LineType(String s, String name) {
+ type = LogRecordType.intern(s);
+ this.name = name;
+ }
+
+ LogRecordType recordType() {
+ return type;
+ }
+
+ String getName(ParsedLine line) {
+ return line.get(name);
+ }
+
+ abstract HistoryEventEmitter createEmitter();
+
+ static LineType findLineType(LogRecordType lrt) {
+ for (LineType lt : LineType.values()) {
+ if (lt.type == lrt) {
+ return lt;
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public HistoryEvent nextEvent() {
+ try {
+ while (remainingEvents.isEmpty()) {
+ ParsedLine line = new ParsedLine(getFullLine(), internalVersion);
+ LineType type = LineType.findLineType(line.getType());
+ if (type == null) {
+ continue;
+ }
+ String name = type.getName(line);
+ HistoryEventEmitter emitter = findOrMakeEmitter(name, type);
+ Pair<Queue<HistoryEvent>, HistoryEventEmitter.PostEmitAction> pair =
+ emitter.emitterCore(line, name);
+ if (pair.second() == HistoryEventEmitter.PostEmitAction.REMOVE_HEE) {
+ liveEmitters.remove(name);
+ }
+ remainingEvents = pair.first();
+ }
+ return remainingEvents.poll();
+ } catch (EOFException e) {
+ return null;
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ HistoryEventEmitter findOrMakeEmitter(String name, LineType type) {
+ HistoryEventEmitter result = liveEmitters.get(name);
+ if (result == null) {
+ result = type.createEmitter();
+ liveEmitters.put(name, result);
+ }
+ return result;
+ }
+
+ private String getOneLine() throws IOException {
+ Text resultText = new Text();
+
+ if (reader.readLine(resultText) == 0) {
+ throw new EOFException("apparent bad line");
+ }
+
+ return resultText.toString();
+ }
+
+ private String getFullLine() throws IOException {
+ String line = getOneLine();
+
+ while (line.length() < endLineString.length()) {
+ line = getOneLine();
+ }
+
+ if (line.endsWith(endLineString)) {
+ return line;
+ }
+
+ StringBuilder sb = new StringBuilder(line);
+
+ String addedLine;
+
+ do {
+ addedLine = getOneLine();
+
+ if (addedLine == null) {
+ return sb.toString();
+ }
+
+ sb.append("\n");
+ sb.append(addedLine);
+ } while (addedLine.length() < endLineString.length()
+ || !endLineString.equals(addedLine.substring(addedLine.length()
+ - endLineString.length())));
+
+ return sb.toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Thu Feb 18 18:43:28 2010
@@ -70,6 +70,7 @@
* about it. See {@code usage()}, below.
*
*/
+@Deprecated
public class HadoopLogsAnalyzer extends Configured implements Tool {
// output streams
@@ -103,14 +104,15 @@
/**
* The regular expression used to parse task attempt IDs in job tracker logs
*/
- private final static Pattern taskAttemptIDPattern = Pattern
- .compile(".*_([0-9]+)");
+ private final static Pattern taskAttemptIDPattern =
+ Pattern.compile(".*_([0-9]+)");
private final static Pattern xmlFilePrefix = Pattern.compile("[ \t]*<");
private final static Pattern confFileHeader = Pattern.compile("_conf.xml!!");
- private final Map<String, Pattern> counterPatterns = new HashMap<String, Pattern>();
+ private final Map<String, Pattern> counterPatterns =
+ new HashMap<String, Pattern>();
/**
* The unpaired job config file. Currently only used to glean the {@code -Xmx}
@@ -188,14 +190,14 @@
private boolean collectTaskTimes = false;
private LogRecordType canonicalJob = LogRecordType.intern("Job");
- private LogRecordType canonicalMapAttempt = LogRecordType
- .intern("MapAttempt");
- private LogRecordType canonicalReduceAttempt = LogRecordType
- .intern("ReduceAttempt");
+ private LogRecordType canonicalMapAttempt =
+ LogRecordType.intern("MapAttempt");
+ private LogRecordType canonicalReduceAttempt =
+ LogRecordType.intern("ReduceAttempt");
private LogRecordType canonicalTask = LogRecordType.intern("Task");
- private static Pattern streamingJobnamePattern = Pattern
- .compile("streamjob\\d+.jar");
+ private static Pattern streamingJobnamePattern =
+ Pattern.compile("streamjob\\d+.jar");
private HashSet<String> hostNames = new HashSet<String>();
@@ -250,8 +252,8 @@
result[i] = new Histogram[LoggedJob.JobType.values().length];
for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
- result[i][j] = blockname == null ? new Histogram() : new Histogram(
- blockname);
+ result[i][j] =
+ blockname == null ? new Histogram() : new Histogram(blockname);
}
}
@@ -505,8 +507,9 @@
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory jfactory = jmapper.getJsonFactory();
FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
- jobTraceGen = jfactory.createJsonGenerator(
- jobFS.create(jobTraceFilename), JsonEncoding.UTF8);
+ jobTraceGen =
+ jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
+ JsonEncoding.UTF8);
if (prettyprintTrace) {
jobTraceGen.useDefaultPrettyPrinter();
}
@@ -517,8 +520,9 @@
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory tfactory = tmapper.getJsonFactory();
FileSystem topoFS = topologyFilename.getFileSystem(getConf());
- topologyGen = tfactory.createJsonGenerator(
- topoFS.create(topologyFilename), JsonEncoding.UTF8);
+ topologyGen =
+ tfactory.createJsonGenerator(topoFS.create(topologyFilename),
+ JsonEncoding.UTF8);
topologyGen.useDefaultPrettyPrinter();
}
}
@@ -546,7 +550,7 @@
IOException {
if (input != null) {
input.close();
- LOG.info("File closed: "+currentFileName);
+ LOG.info("File closed: " + currentFileName);
input = null;
}
@@ -573,7 +577,8 @@
+ inputDirectoryFiles.length + ", starts with line " + lineNumber
+ ".");
- input = maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
+ input =
+ maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
return input != null;
}
@@ -734,8 +739,9 @@
long[] endpointKeys = taskTimes.getCDF(1000, endpoints);
- int smallResultOffset = (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
- : 0);
+ int smallResultOffset =
+ (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
+ : 0);
Histogram myTotal = spreadTo[outcome.ordinal()][jtype.ordinal()];
@@ -803,14 +809,15 @@
attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
// initialize all the per-job statistics gathering places
- successfulMapAttemptTimes = new Histogram[ParsedHost
- .numberOfDistances() + 1];
+ successfulMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
successfulMapAttemptTimes[i] = new Histogram();
}
successfulReduceAttemptTimes = new Histogram();
- failedMapAttemptTimes = new Histogram[ParsedHost.numberOfDistances() + 1];
+ failedMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
failedMapAttemptTimes[i] = new Histogram();
}
@@ -851,7 +858,8 @@
if (finishTime != null) {
jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
if (status != null) {
- jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
+ jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
+ .valueOf(status));
}
maybeMateJobAndConf();
@@ -890,9 +898,9 @@
if (launchTimeCurrentJob != 0) {
String jobResultText = line.get("JOB_STATUS");
- JobOutcome thisOutcome = ((jobResultText != null && "SUCCESS"
- .equals(jobResultText)) ? JobOutcome.SUCCESS
- : JobOutcome.FAILURE);
+ JobOutcome thisOutcome =
+ ((jobResultText != null && "SUCCESS".equals(jobResultText))
+ ? JobOutcome.SUCCESS : JobOutcome.FAILURE);
if (submitTimeCurrentJob != 0L) {
canonicalDistributionsEnter(delayTimeDists, thisOutcome,
@@ -911,8 +919,8 @@
Histogram currentJobSortTimes = new Histogram();
Histogram currentJobReduceTimes = new Histogram();
- Iterator<Map.Entry<String, Long>> taskIter = taskAttemptStartTimes
- .entrySet().iterator();
+ Iterator<Map.Entry<String, Long>> taskIter =
+ taskAttemptStartTimes.entrySet().iterator();
while (taskIter.hasNext()) {
Map.Entry<String, Long> entry = taskIter.next();
@@ -930,8 +938,8 @@
}
// Reduce processing
- Long shuffleEnd = taskReduceAttemptShuffleEndTimes.get(entry
- .getKey());
+ Long shuffleEnd =
+ taskReduceAttemptShuffleEndTimes.get(entry.getKey());
Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
@@ -1027,7 +1035,9 @@
Pre21JobHistoryConstants.Values stat;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.error("A task status you don't know about is \"" + status + "\".",
e);
@@ -1037,22 +1047,26 @@
task.setTaskStatus(stat);
try {
- typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
+ typ =
+ taskType == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(taskType);
} catch (IllegalArgumentException e) {
LOG.error("A task type you don't know about is \"" + taskType + "\".",
e);
typ = null;
}
-
+
if (typ == null) {
return;
}
task.setTaskType(typ);
- List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
- .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
- .getReduceTasks() : jobBeingTraced.getOtherTasks();
+ List<LoggedTask> vec =
+ typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+ .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE
+ ? jobBeingTraced.getReduceTasks() : jobBeingTraced
+ .getOtherTasks();
if (!taskAlreadyLogged) {
vec.add(task);
@@ -1066,8 +1080,8 @@
Pattern result = counterPatterns.get(counterName);
if (result == null) {
- String namePatternRegex = "\\[\\(" + counterName
- + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
+ String namePatternRegex =
+ "\\[\\(" + counterName + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
result = Pattern.compile(namePatternRegex);
counterPatterns.put(counterName, result);
}
@@ -1253,7 +1267,9 @@
Pre21JobHistoryConstants.Values stat = null;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.error("A map attempt status you don't know about is \"" + status
+ "\".", e);
@@ -1404,7 +1420,9 @@
Pre21JobHistoryConstants.Values stat = null;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.warn("A map attempt status you don't know about is \"" + status
+ "\".", e);
@@ -1632,8 +1650,8 @@
}
for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
- successAfterI[ent.getKey().intValue()] = ((double) ent.getValue())
- / totalSuccessfulAttempts;
+ successAfterI[ent.getKey().intValue()] =
+ ((double) ent.getValue()) / totalSuccessfulAttempts;
}
jobBeingTraced.setMapperTriesToSucceed(successAfterI);
} else {
@@ -1712,8 +1730,9 @@
}
if (spreading) {
- String ratioDescription = "(" + spreadMax + "/1000 %ile) to ("
- + spreadMin + "/1000 %ile) scaled by 1000000";
+ String ratioDescription =
+ "(" + spreadMax + "/1000 %ile) to (" + spreadMin
+ + "/1000 %ile) scaled by 1000000";
printDistributionSet(
"Map task success times " + ratioDescription + ":",
@@ -1737,8 +1756,8 @@
}
if (topologyGen != null) {
- LoggedNetworkTopology topo = new LoggedNetworkTopology(allHosts,
- "<root>", 0);
+ LoggedNetworkTopology topo =
+ new LoggedNetworkTopology(allHosts, "<root>", 0);
topologyGen.writeObject(topo);
topologyGen.close();
}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+
+abstract class HistoryEventEmitter {
+ static final private Log LOG = LogFactory.getLog(HistoryEventEmitter.class);
+
+ abstract List<SingleEventEmitter> nonFinalSEEs();
+
+ abstract List<SingleEventEmitter> finalSEEs();
+
+ protected HistoryEventEmitter() {
+ // no code
+ }
+
+ enum PostEmitAction {
+ NONE, REMOVE_HEE
+ };
+
+ final Pair<Queue<HistoryEvent>, PostEmitAction> emitterCore(ParsedLine line,
+ String name) {
+ Queue<HistoryEvent> results = new LinkedList<HistoryEvent>();
+ PostEmitAction removeEmitter = PostEmitAction.NONE;
+ for (SingleEventEmitter see : nonFinalSEEs()) {
+ HistoryEvent event = see.maybeEmitEvent(line, name, this);
+ if (event != null) {
+ results.add(event);
+ }
+ }
+ for (SingleEventEmitter see : finalSEEs()) {
+ HistoryEvent event = see.maybeEmitEvent(line, name, this);
+ if (event != null) {
+ results.add(event);
+ removeEmitter = PostEmitAction.REMOVE_HEE;
+ break;
+ }
+ }
+ return new Pair<Queue<HistoryEvent>, PostEmitAction>(results, removeEmitter);
+ }
+
+ protected static Counters parseCounters(String counters)
+ throws ParseException {
+ if (counters == null) {
+ LOG.warn("HistoryEventEmitters: null counter detected:");
+ return null;
+ }
+
+ counters = counters.replace("\\.", "\\\\.");
+ counters = counters.replace("\\\\(", "\\(");
+ counters = counters.replace("\\\\)", "\\)");
+
+ org.apache.hadoop.mapred.Counters depForm =
+ org.apache.hadoop.mapred.Counters.fromEscapedCompactString(counters);
+
+ return new Counters(depForm);
+ }
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link InputDemuxer} dem-ultiplexes the input files into individual input
+ * streams.
+ */
+public interface InputDemuxer extends Closeable {
+ /**
+ * Bind the {@link InputDemuxer} to a particular file.
+ *
+ * @param path
+ * The path to the find it should bind to.
+ * @param conf
+ * Configuration
+ * @throws IOException
+ *
+ * Returns true when the binding succeeds. If the file can be read
+ * but is in the wrong format, returns false. IOException is
+ * reserved for read errors.
+ */
+ public void bindTo(Path path, Configuration conf) throws IOException;
+
+ /**
+ * Get the next <name, input> pair. The name should preserve the original job
+ * history file or job conf file name. The input object should be closed
+ * before calling getNext() again. The old input object would be invalid after
+ * calling getNext() again.
+ *
+ * @return the next <name, input> pair.
+ */
+ public Pair<String, InputStream> getNext() throws IOException;
+}
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+
+public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+ static List<SingleEventEmitter> nonFinals =
+ new LinkedList<SingleEventEmitter>();
+ static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+ Long originalSubmitTime = null;
+
+ static {
+ nonFinals.add(new JobSubmittedEventEmitter());
+ nonFinals.add(new JobPriorityChangeEventEmitter());
+ nonFinals.add(new JobStatusChangedEventEmitter());
+ nonFinals.add(new JobInitedEventEmitter());
+ nonFinals.add(new JobInfoChangeEventEmitter());
+
+ finals.add(new JobUnsuccessfulCompletionEventEmitter());
+ finals.add(new JobFinishedEventEmitter());
+ }
+
+ Job20LineHistoryEventEmitter() {
+ super();
+ }
+
+ static private class JobSubmittedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ JobID jobID = JobID.forName(jobIDName);
+
+ if (jobIDName == null) {
+ return null;
+ }
+
+ String submitTime = line.get("SUBMIT_TIME");
+ String jobConf = line.get("JOBCONF");
+ String user = line.get("USER");
+ String jobName = line.get("JOBNAME");
+
+ if (submitTime != null) {
+ Job20LineHistoryEventEmitter that =
+ (Job20LineHistoryEventEmitter) thatg;
+
+ that.originalSubmitTime = Long.parseLong(submitTime);
+
+ return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
+ : user, that.originalSubmitTime, jobConf);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobPriorityChangeEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ JobID jobID = JobID.forName(jobIDName);
+
+ if (jobIDName == null) {
+ return null;
+ }
+
+ String priority = line.get("JOB_PRIORITY");
+
+ if (priority != null) {
+ return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobInitedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String launchTime = line.get("LAUNCH_TIME");
+ String status = line.get("JOB_STATUS");
+ String totalMaps = line.get("TOTAL_MAPS");
+ String totalReduces = line.get("TOTAL_REDUCES");
+
+ if (launchTime != null && totalMaps != null && totalReduces != null) {
+ return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
+ .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobStatusChangedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String status = line.get("JOB_STATUS");
+
+ if (status != null) {
+ return new JobStatusChangedEvent(jobID, status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobInfoChangeEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String launchTime = line.get("LAUNCH_TIME");
+
+ if (launchTime != null) {
+ Job20LineHistoryEventEmitter that =
+ (Job20LineHistoryEventEmitter) thatg;
+ return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
+ .parseLong(launchTime));
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobUnsuccessfulCompletionEventEmitter extends
+ SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+
+ String status = line.get("JOB_STATUS");
+
+ String finishedMaps = line.get("FINISHED_MAPS");
+ String finishedReduces = line.get("FINISHED_REDUCES");
+
+ if (status != null && !status.equalsIgnoreCase("success")
+ && finishTime != null && finishedMaps != null
+ && finishedReduces != null) {
+ return new JobUnsuccessfulCompletionEvent(jobID, Long
+ .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
+ .parseInt(finishedReduces), status);
+ }
+
+ return null;
+ }
+ }
+
+ static private class JobFinishedEventEmitter extends SingleEventEmitter {
+ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+ HistoryEventEmitter thatg) {
+ if (jobIDName == null) {
+ return null;
+ }
+
+ JobID jobID = JobID.forName(jobIDName);
+
+ String finishTime = line.get("FINISH_TIME");
+
+ String status = line.get("JOB_STATUS");
+
+ String finishedMaps = line.get("FINISHED_MAPS");
+ String finishedReduces = line.get("FINISHED_REDUCES");
+
+ String failedMaps = line.get("FAILED_MAPS");
+ String failedReduces = line.get("FAILED_REDUCES");
+
+ String counters = line.get("COUNTERS");
+
+ if (status != null && status.equalsIgnoreCase("success")
+ && finishTime != null && finishedMaps != null
+ && finishedReduces != null) {
+ try {
+ return new JobFinishedEvent(jobID, Long.parseLong(finishTime),
+ Integer.parseInt(finishedMaps),
+ Integer.parseInt(finishedReduces), Integer.parseInt(failedMaps),
+ Integer.parseInt(failedReduces), null, null,
+ parseCounters(counters));
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ List<SingleEventEmitter> finalSEEs() {
+ return finals;
+ }
+
+ @Override
+ List<SingleEventEmitter> nonFinalSEEs() {
+ return nonFinals;
+ }
+
+}