You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/24 00:04:33 UTC

svn commit: r967285 - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/

Author: cdouglas
Date: Fri Jul 23 22:04:33 2010
New Revision: 967285

URL: http://svn.apache.org/viewvc?rev=967285&view=rev
Log:
MAPREDUCE-1925. Fix failing TestRumenJobTraces. Contributed by Ravi Gummadi

Removed:
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 22:04:33 2010
@@ -193,6 +193,8 @@ Trunk (unreleased changes)
     MAPREDUCE-1772. Corrects errors in streaming documentation in forrest.
     (amareshwari)
 
+    MAPREDUCE-1925. Fix failing TestRumenJobTraces. (Ravi Gummadi via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Jul 23 22:04:33 2010
@@ -18,11 +18,8 @@
 
 package org.apache.hadoop.tools.rumen;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -32,17 +29,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
@@ -54,7 +45,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -215,123 +205,38 @@ public class TestRumenJobTraces {
     final Configuration conf = new Configuration();
     final FileSystem lfs = FileSystem.getLocal(conf);
 
-    boolean success = false;
-
     final Path rootInputDir =
         new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
             lfs.getUri(), lfs.getWorkingDirectory());
-    final Path rootTempDir =
-        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
-            lfs.getUri(), lfs.getWorkingDirectory());
 
     final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
-    final Path tempDir = new Path(rootTempDir, "TestHadoop20JHParser");
-    lfs.delete(tempDir, true);
 
+    // history file to be parsed to get events
     final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
-    final Path goldPath =
-        new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
-
-    InputStream inputLogStream =
-        new PossiblyDecompressedInputStream(inputPath, conf);
 
-    InputStream inputGoldStream =
-        new PossiblyDecompressedInputStream(goldPath, conf);
-
-    BufferedInputStream bis = new BufferedInputStream(inputLogStream);
-    bis.mark(10000);
-    Hadoop20JHParser parser = new Hadoop20JHParser(bis);
-
-    final Path resultPath = new Path(tempDir, "result.text");
-
-    System.out.println("testHadoop20JHParser sent its output to " + resultPath);
-
-    Compressor compressor;
-
-    FileSystem fs = resultPath.getFileSystem(conf);
-    CompressionCodec codec =
-        new CompressionCodecFactory(conf).getCodec(resultPath);
-    OutputStream output;
-    if (codec != null) {
-      compressor = CodecPool.getCompressor(codec);
-      output = codec.createOutputStream(fs.create(resultPath), compressor);
-    } else {
-      output = fs.create(resultPath);
-    }
+    RewindableInputStream ris = getRewindableInputStream(inputPath, conf);
+    assertNotNull(ris);
 
-    PrintStream printStream = new PrintStream(output);
+    Hadoop20JHParser parser = null;
 
     try {
-      assertEquals("Hadoop20JHParser can't parse the test file", true,
-          Hadoop20JHParser.canParse(inputLogStream));
-
-      bis.reset();
-
-      HistoryEvent event = parser.nextEvent();
-
-      while (event != null) {
-        printStream.println(event.getClass().getCanonicalName());
-        event = parser.nextEvent();
-      }
-
-      printStream.close();
-
-      LineReader goldLines = new LineReader(inputGoldStream);
-      LineReader resultLines =
-          new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
-
-      int lineNumber = 1;
-
-      try {
-        Text goldLine = new Text();
-        Text resultLine = new Text();
-
-        int goldRead = goldLines.readLine(goldLine);
-        int resultRead = resultLines.readLine(resultLine);
-
-        while (goldRead * resultRead != 0) {
-          if (!goldLine.equals(resultLine)) {
-            assertEquals("Type mismatch detected", goldLine, resultLine);
-            break;
-          }
-
-          goldRead = goldLines.readLine(goldLine);
-          resultRead = resultLines.readLine(resultLine);
-
-          ++lineNumber;
-        }
-
-        if (goldRead != resultRead) {
-          assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
-              + " file contains more text at line " + lineNumber, goldRead,
-              resultRead);
-        }
-
-        success = true;
-      } finally {
-        goldLines.close();
-        resultLines.close();
-
-        if (success) {
-          lfs.delete(resultPath, false);
-        }
-      }
+      assertEquals("Hadoop20JHParser can't parse the test file " +
+          inputPath, true, Hadoop20JHParser.canParse(ris));
 
+      ris.rewind();
+      parser = new Hadoop20JHParser(ris);
+      ArrayList<String> seenEvents = new ArrayList<String>(150);
+
+      getHistoryEvents(parser, seenEvents); // get events into seenEvents
+
+      // Validate the events seen by history parser from
+      // history file v20-single-input-log.gz
+      validateSeenHistoryEvents(seenEvents, goldLines);
     } finally {
-      if (parser == null) {
-        inputLogStream.close();
-      } else {
-        if (parser != null) {
-          parser.close();
-        }
-      }
-
-      if (inputGoldStream != null) {
-        inputGoldStream.close();
+      if (parser != null) {
+        parser.close();
       }
-
-      // it's okay to do this twice [if we get an error on input]
-      printStream.close();
+      ris.close();
     }
   }
 
@@ -410,7 +315,7 @@ public class TestRumenJobTraces {
     Path outDir = new Path(tempDir, "output");
     JobHistoryParser parser = null;
     RewindableInputStream ris = null;
-    ArrayList<String> seenEvents = new ArrayList<String>(10);
+    ArrayList<String> seenEvents = new ArrayList<String>(15);
     
     try {
       JobConf jConf = mrCluster.createJobConf();
@@ -443,25 +348,23 @@ public class TestRumenJobTraces {
       }
     
       assertTrue("Missing job history file", lfs.exists(inputPath));
-    
-      InputDemuxer inputDemuxer = new DefaultInputDemuxer();
-      inputDemuxer.bindTo(inputPath, conf);
-    
-      Pair<String, InputStream> filePair = inputDemuxer.getNext();
-    
-      assertNotNull(filePair);
-    
-      ris = new RewindableInputStream(filePair.second());
+
+      ris = getRewindableInputStream(inputPath, conf);
 
       // Test if the JobHistoryParserFactory can detect the parser correctly
       parser = JobHistoryParserFactory.getParser(ris);
-        
-      HistoryEvent e;
-      while ((e = parser.nextEvent()) != null) {
-        String eventString = e.getEventType().toString();
-        System.out.println(eventString);
-        seenEvents.add(eventString);
-      }
+
+      getHistoryEvents(parser, seenEvents); // get events into seenEvents
+
+      // Check against the gold standard
+      System.out.println("testCurrentJHParser validating using gold std ");
+      // The list of history events expected when parsing the above job's
+      // history log file
+      String[] goldLinesExpected = new String[] {
+          JSE, JPCE, JIE, JSCE, TSE, ASE, MFE, TFE, TSE, ASE, RFE, TFE, JFE
+      };
+
+      validateSeenHistoryEvents(seenEvents, goldLinesExpected);
     } finally {
       // stop the MR cluster
       mrCluster.shutdown();
@@ -476,22 +379,6 @@ public class TestRumenJobTraces {
       // cleanup the filesystem
       lfs.delete(tempDir, true);
     }
-
-    // Check against the gold standard
-    System.out.println("testCurrentJHParser validating using gold std ");
-    String[] goldLines = new String[] {"JOB_SUBMITTED", "JOB_PRIORITY_CHANGED",
-        "JOB_INITED", "JOB_STATUS_CHANGED", "TASK_STARTED", 
-        "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "TASK_FINISHED", 
-        "TASK_STARTED", "REDUCE_ATTEMPT_STARTED", "REDUCE_ATTEMPT_FINISHED", 
-        "TASK_FINISHED", "JOB_FINISHED"};
-    
-    // Check the output with gold std
-    assertEquals("Size mismatch", goldLines.length, seenEvents.size());
-    
-    int index = 0;
-    for (String goldLine : goldLines) {
-      assertEquals("Content mismatch", goldLine, seenEvents.get(index++));
-    }
   }
   
   @Test
@@ -642,4 +529,95 @@ public class TestRumenJobTraces {
       IOUtils.cleanup(null, goldParser, resultParser);
     }
   }
+
+  /**
+   * Creates {@link RewindableInputStream} for the given file path.
+   * @param inputPath the input file path
+   * @param conf configuration
+   * @return {@link RewindableInputStream}
+   * @throws IOException
+   */
+  private RewindableInputStream getRewindableInputStream(Path inputPath,
+      Configuration conf) throws IOException {
+
+    PossiblyDecompressedInputStream in =
+        new PossiblyDecompressedInputStream(inputPath, conf);
+
+    return new RewindableInputStream(in, BUFSIZE);
+  }
+
+  /**
+   * Allows given history parser to parse the history events and places in
+   * the given list
+   * @param parser the job history parser
+   * @param events the job history events seen while parsing
+   * @throws IOException
+   */
+  private void getHistoryEvents(JobHistoryParser parser,
+      ArrayList<String> events) throws IOException {
+    HistoryEvent e;
+    while ((e = parser.nextEvent()) != null) {
+      String eventString = e.getClass().getSimpleName();
+      System.out.println(eventString);
+      events.add(eventString);
+    }
+  }
+
+  /**
+   * Validate if history events seen are as expected
+   * @param seenEvents the list of history events seen
+   * @param goldLinesExpected  the expected history events
+   */
+  private void validateSeenHistoryEvents(ArrayList<String> seenEvents,
+      String[] goldLinesExpected) {
+
+    // Check the output with gold std
+    assertEquals("Number of events expected is different from the events seen"
+        + " by the history parser.",
+        goldLinesExpected.length, seenEvents.size());
+
+    int index = 0;
+    for (String goldLine : goldLinesExpected) {
+      assertEquals("History Event mismatch at line " + (index + 1),
+          goldLine, seenEvents.get(index));
+      index++;
+    }
+  }
+
+  final static int BUFSIZE = 8192; // 8K
+
+  // Any Map Reduce Job History Event should be 1 of the following 16
+  final static String JSE = "JobSubmittedEvent";
+  final static String JPCE = "JobPriorityChangeEvent";
+  final static String JSCE = "JobStatusChangedEvent";
+  final static String JIE = "JobInitedEvent";
+  final static String JICE = "JobInfoChangeEvent";
+  static String TSE = "TaskStartedEvent";
+  static String ASE = "TaskAttemptStartedEvent";
+  static String AFE = "TaskAttemptFinishedEvent";
+  static String MFE = "MapAttemptFinishedEvent";
+  static String TUE = "TaskUpdatedEvent";
+  static String TFE = "TaskFinishedEvent";
+  static String JUCE = "JobUnsuccessfulCompletionEvent";
+  static String RFE = "ReduceAttemptFinishedEvent";
+  static String AUCE = "TaskAttemptUnsuccessfulCompletionEvent";
+  static String TFLE = "TaskFailedEvent";
+  static String JFE = "JobFinishedEvent";
+
+  // The expected job history events(in order) when parsing
+  // the job history file v20-single-input-log.gz
+  final static String[] goldLines = new String[] {
+      JSE, JPCE, JSCE, JIE, JICE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, TSE,
+      TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE,
+      TSE, TSE, TSE, TSE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE,
+      TFE, ASE, AFE, MFE, TUE, TFE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE,
+      MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE,
+      AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE,
+      ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AUCE, ASE, AFE,
+      MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE,
+      AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE,
+      ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, RFE, TUE,
+      TFE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, JFE
+  };
+
 }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Fri Jul 23 22:04:33 2010
@@ -33,7 +33,7 @@ public interface InputDemuxer extends Cl
    * Bind the {@link InputDemuxer} to a particular file.
    * 
    * @param path
-   *          The path to the find it should bind to.
+   *          The path to the file it should bind to.
    * @param conf
    *          Configuration
    * @throws IOException

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=967285&r1=967284&r2=967285&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Fri Jul 23 22:04:33 2010
@@ -62,11 +62,14 @@ class PossiblyDecompressedInputStream ex
 
   @Override
   public void close() throws IOException {
+    // coreInputStream.close() is called before returning of decompressor to the
+    // pool because coreInputStream.close() could(though currently it doesn't)
+    // access the decompressor.
+    coreInputStream.close();
+
     if (decompressor != null) {
       CodecPool.returnDecompressor(decompressor);
     }
-
-    coreInputStream.close();
   }
 
 }