You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/24 00:04:33 UTC
svn commit: r967285 - in /hadoop/mapreduce/trunk: ./
src/test/mapred/org/apache/hadoop/tools/rumen/
src/test/tools/data/rumen/small-trace-test/
src/tools/org/apache/hadoop/tools/rumen/
Author: cdouglas
Date: Fri Jul 23 22:04:33 2010
New Revision: 967285
URL: http://svn.apache.org/viewvc?rev=967285&view=rev
Log:
MAPREDUCE-1925. Fix failing TestRumenJobTraces. Contributed by Ravi Gummadi
Removed:
hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 22:04:33 2010
@@ -193,6 +193,8 @@ Trunk (unreleased changes)
MAPREDUCE-1772. Corrects errors in streaming documentation in forrest.
(amareshwari)
+ MAPREDUCE-1925. Fix failing TestRumenJobTraces. (Ravi Gummadi via cdouglas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Jul 23 22:04:33 2010
@@ -18,11 +18,8 @@
package org.apache.hadoop.tools.rumen;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -32,17 +29,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
@@ -54,7 +45,6 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -215,123 +205,38 @@ public class TestRumenJobTraces {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
- boolean success = false;
-
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
- final Path rootTempDir =
- new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
- lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
- final Path tempDir = new Path(rootTempDir, "TestHadoop20JHParser");
- lfs.delete(tempDir, true);
+ // history file to be parsed to get events
final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
- final Path goldPath =
- new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
-
- InputStream inputLogStream =
- new PossiblyDecompressedInputStream(inputPath, conf);
- InputStream inputGoldStream =
- new PossiblyDecompressedInputStream(goldPath, conf);
-
- BufferedInputStream bis = new BufferedInputStream(inputLogStream);
- bis.mark(10000);
- Hadoop20JHParser parser = new Hadoop20JHParser(bis);
-
- final Path resultPath = new Path(tempDir, "result.text");
-
- System.out.println("testHadoop20JHParser sent its output to " + resultPath);
-
- Compressor compressor;
-
- FileSystem fs = resultPath.getFileSystem(conf);
- CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(resultPath);
- OutputStream output;
- if (codec != null) {
- compressor = CodecPool.getCompressor(codec);
- output = codec.createOutputStream(fs.create(resultPath), compressor);
- } else {
- output = fs.create(resultPath);
- }
+ RewindableInputStream ris = getRewindableInputStream(inputPath, conf);
+ assertNotNull(ris);
- PrintStream printStream = new PrintStream(output);
+ Hadoop20JHParser parser = null;
try {
- assertEquals("Hadoop20JHParser can't parse the test file", true,
- Hadoop20JHParser.canParse(inputLogStream));
-
- bis.reset();
-
- HistoryEvent event = parser.nextEvent();
-
- while (event != null) {
- printStream.println(event.getClass().getCanonicalName());
- event = parser.nextEvent();
- }
-
- printStream.close();
-
- LineReader goldLines = new LineReader(inputGoldStream);
- LineReader resultLines =
- new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
-
- int lineNumber = 1;
-
- try {
- Text goldLine = new Text();
- Text resultLine = new Text();
-
- int goldRead = goldLines.readLine(goldLine);
- int resultRead = resultLines.readLine(resultLine);
-
- while (goldRead * resultRead != 0) {
- if (!goldLine.equals(resultLine)) {
- assertEquals("Type mismatch detected", goldLine, resultLine);
- break;
- }
-
- goldRead = goldLines.readLine(goldLine);
- resultRead = resultLines.readLine(resultLine);
-
- ++lineNumber;
- }
-
- if (goldRead != resultRead) {
- assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
- + " file contains more text at line " + lineNumber, goldRead,
- resultRead);
- }
-
- success = true;
- } finally {
- goldLines.close();
- resultLines.close();
-
- if (success) {
- lfs.delete(resultPath, false);
- }
- }
+ assertEquals("Hadoop20JHParser can't parse the test file " +
+ inputPath, true, Hadoop20JHParser.canParse(ris));
+ ris.rewind();
+ parser = new Hadoop20JHParser(ris);
+ ArrayList<String> seenEvents = new ArrayList<String>(150);
+
+ getHistoryEvents(parser, seenEvents); // get events into seenEvents
+
+ // Validate the events seen by history parser from
+ // history file v20-single-input-log.gz
+ validateSeenHistoryEvents(seenEvents, goldLines);
} finally {
- if (parser == null) {
- inputLogStream.close();
- } else {
- if (parser != null) {
- parser.close();
- }
- }
-
- if (inputGoldStream != null) {
- inputGoldStream.close();
+ if (parser != null) {
+ parser.close();
}
-
- // it's okay to do this twice [if we get an error on input]
- printStream.close();
+ ris.close();
}
}
@@ -410,7 +315,7 @@ public class TestRumenJobTraces {
Path outDir = new Path(tempDir, "output");
JobHistoryParser parser = null;
RewindableInputStream ris = null;
- ArrayList<String> seenEvents = new ArrayList<String>(10);
+ ArrayList<String> seenEvents = new ArrayList<String>(15);
try {
JobConf jConf = mrCluster.createJobConf();
@@ -443,25 +348,23 @@ public class TestRumenJobTraces {
}
assertTrue("Missing job history file", lfs.exists(inputPath));
-
- InputDemuxer inputDemuxer = new DefaultInputDemuxer();
- inputDemuxer.bindTo(inputPath, conf);
-
- Pair<String, InputStream> filePair = inputDemuxer.getNext();
-
- assertNotNull(filePair);
-
- ris = new RewindableInputStream(filePair.second());
+
+ ris = getRewindableInputStream(inputPath, conf);
// Test if the JobHistoryParserFactory can detect the parser correctly
parser = JobHistoryParserFactory.getParser(ris);
-
- HistoryEvent e;
- while ((e = parser.nextEvent()) != null) {
- String eventString = e.getEventType().toString();
- System.out.println(eventString);
- seenEvents.add(eventString);
- }
+
+ getHistoryEvents(parser, seenEvents); // get events into seenEvents
+
+ // Check against the gold standard
+ System.out.println("testCurrentJHParser validating using gold std ");
+ // The list of history events expected when parsing the above job's
+ // history log file
+ String[] goldLinesExpected = new String[] {
+ JSE, JPCE, JIE, JSCE, TSE, ASE, MFE, TFE, TSE, ASE, RFE, TFE, JFE
+ };
+
+ validateSeenHistoryEvents(seenEvents, goldLinesExpected);
} finally {
// stop the MR cluster
mrCluster.shutdown();
@@ -476,22 +379,6 @@ public class TestRumenJobTraces {
// cleanup the filesystem
lfs.delete(tempDir, true);
}
-
- // Check against the gold standard
- System.out.println("testCurrentJHParser validating using gold std ");
- String[] goldLines = new String[] {"JOB_SUBMITTED", "JOB_PRIORITY_CHANGED",
- "JOB_INITED", "JOB_STATUS_CHANGED", "TASK_STARTED",
- "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "TASK_FINISHED",
- "TASK_STARTED", "REDUCE_ATTEMPT_STARTED", "REDUCE_ATTEMPT_FINISHED",
- "TASK_FINISHED", "JOB_FINISHED"};
-
- // Check the output with gold std
- assertEquals("Size mismatch", goldLines.length, seenEvents.size());
-
- int index = 0;
- for (String goldLine : goldLines) {
- assertEquals("Content mismatch", goldLine, seenEvents.get(index++));
- }
}
@Test
@@ -642,4 +529,95 @@ public class TestRumenJobTraces {
IOUtils.cleanup(null, goldParser, resultParser);
}
}
+
+ /**
+ * Creates {@link RewindableInputStream} for the given file path.
+ * @param inputPath the input file path
+ * @param conf configuration
+ * @return {@link RewindableInputStream}
+ * @throws IOException
+ */
+ private RewindableInputStream getRewindableInputStream(Path inputPath,
+ Configuration conf) throws IOException {
+
+ PossiblyDecompressedInputStream in =
+ new PossiblyDecompressedInputStream(inputPath, conf);
+
+ return new RewindableInputStream(in, BUFSIZE);
+ }
+
+ /**
+ * Allows given history parser to parse the history events and places in
+ * the given list
+ * @param parser the job history parser
+ * @param events the job history events seen while parsing
+ * @throws IOException
+ */
+ private void getHistoryEvents(JobHistoryParser parser,
+ ArrayList<String> events) throws IOException {
+ HistoryEvent e;
+ while ((e = parser.nextEvent()) != null) {
+ String eventString = e.getClass().getSimpleName();
+ System.out.println(eventString);
+ events.add(eventString);
+ }
+ }
+
+ /**
+ * Validate if history events seen are as expected
+ * @param seenEvents the list of history events seen
+ * @param goldLinesExpected the expected history events
+ */
+ private void validateSeenHistoryEvents(ArrayList<String> seenEvents,
+ String[] goldLinesExpected) {
+
+ // Check the output with gold std
+ assertEquals("Number of events expected is different from the events seen"
+ + " by the history parser.",
+ goldLinesExpected.length, seenEvents.size());
+
+ int index = 0;
+ for (String goldLine : goldLinesExpected) {
+ assertEquals("History Event mismatch at line " + (index + 1),
+ goldLine, seenEvents.get(index));
+ index++;
+ }
+ }
+
+ final static int BUFSIZE = 8192; // 8K
+
+ // Any Map Reduce Job History Event should be 1 of the following 16
+ final static String JSE = "JobSubmittedEvent";
+ final static String JPCE = "JobPriorityChangeEvent";
+ final static String JSCE = "JobStatusChangedEvent";
+ final static String JIE = "JobInitedEvent";
+ final static String JICE = "JobInfoChangeEvent";
+ static String TSE = "TaskStartedEvent";
+ static String ASE = "TaskAttemptStartedEvent";
+ static String AFE = "TaskAttemptFinishedEvent";
+ static String MFE = "MapAttemptFinishedEvent";
+ static String TUE = "TaskUpdatedEvent";
+ static String TFE = "TaskFinishedEvent";
+ static String JUCE = "JobUnsuccessfulCompletionEvent";
+ static String RFE = "ReduceAttemptFinishedEvent";
+ static String AUCE = "TaskAttemptUnsuccessfulCompletionEvent";
+ static String TFLE = "TaskFailedEvent";
+ static String JFE = "JobFinishedEvent";
+
+ // The expected job history events(in order) when parsing
+ // the job history file v20-single-input-log.gz
+ final static String[] goldLines = new String[] {
+ JSE, JPCE, JSCE, JIE, JICE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, TSE,
+ TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE,
+ TSE, TSE, TSE, TSE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE,
+ TFE, ASE, AFE, MFE, TUE, TFE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE,
+ MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE,
+ AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE,
+ ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AUCE, ASE, AFE,
+ MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE,
+ AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE,
+ ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, RFE, TUE,
+ TFE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, JFE
+ };
+
}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Fri Jul 23 22:04:33 2010
@@ -33,7 +33,7 @@ public interface InputDemuxer extends Cl
* Bind the {@link InputDemuxer} to a particular file.
*
* @param path
- * The path to the find it should bind to.
+ * The path to the file it should bind to.
* @param conf
* Configuration
* @throws IOException
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Fri Jul 23 22:04:33 2010
@@ -62,11 +62,14 @@ class PossiblyDecompressedInputStream ex
@Override
public void close() throws IOException {
+ // coreInputStream.close() is called before returning of decompressor to the
+ // pool because coreInputStream.close() could(though currently it doesn't)
+ // access the decompressor.
+ coreInputStream.close();
+
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
-
- coreInputStream.close();
}
}