You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/11 13:36:24 UTC

svn commit: r953670 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/map...

Author: vinodkv
Date: Fri Jun 11 11:36:23 2010
New Revision: 953670

URL: http://svn.apache.org/viewvc?rev=953670&view=rev
Log:
MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. Contributed by Ravi Gummadi.

Removed:
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun 11 11:36:23 2010
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
     MAPREDUCE-1505. Create RPC client on job submission, not in cstr of Job
     instance. (Dick King via cdouglas)
 
+    MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. (Ravi Gummadi via vinodkv)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jun 11 11:36:23 2010
@@ -222,8 +222,6 @@ public abstract class PipeMapRed {
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       startTime_ = System.currentTimeMillis();
 
-      errThread_ = new MRErrorThread();
-      errThread_.start();
     } catch (IOException e) {
       logStackTrace(e);
       LOG.error("configuration exception", e);
@@ -338,7 +336,9 @@ public abstract class PipeMapRed {
     outReader_ = createOutputReader();
     outThread_ = new MROutputThread(outReader_, output, reporter);
     outThread_.start();
+    errThread_ = new MRErrorThread();
     errThread_.setReporter(reporter);
+    errThread_.start();
   }
   
   void waitOutputThreads() throws IOException {

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java Fri Jun 11 11:36:23 2010
@@ -25,8 +25,6 @@ import org.apache.hadoop.mapred.OutputCo
 
 import java.io.IOException;
 
-import org.apache.hadoop.util.ReflectionUtils;
-
 public class PipeMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                   Reporter reporter)

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java Fri Jun 11 11:36:23 2010
@@ -22,88 +22,303 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.File;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 
 /**
- * Tests for the ability of a streaming task to set the status
- * by writing "reporter:status:" lines to stderr. Uses MiniMR
- * since the local jobtracker doesn't track status.
+ * Tests if mapper/reducer with empty/nonempty input works properly if
+ * reporting is done using lines like "reporter:status:" and
+ * "reporter:counter:" before map()/reduce() method is called.
+ * Validates the task's log of STDERR if messages are written to stderr before
+ * map()/reduce() is called.
+ * Also validates job output.
+ * Uses MiniMR since the local jobtracker doesn't track task status. 
  */
 public class TestStreamingStatus {
-  private static String TEST_ROOT_DIR =
-    new File(System.getProperty("test.build.data","/tmp"))
+  protected static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data","/tmp"),
+    TestStreamingStatus.class.getSimpleName())
     .toURI().toString().replace(' ', '+');
   protected String INPUT_FILE = TEST_ROOT_DIR + "/input.txt";
   protected String OUTPUT_DIR = TEST_ROOT_DIR + "/out";
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
-  protected String map = StreamUtil.makeJavaCommand(StderrApp.class, new String[]{"3", "0", "0", "true"});
+  protected String map = null;
+  protected String reduce = null;
 
-  protected String[] genArgs(int jobtrackerPort) {
+  protected String scriptFile = TEST_ROOT_DIR + "/perlScript.pl";
+  protected String scriptFileName = new Path(scriptFile).toUri().getPath();
+
+
+  String expectedStderr = "my error msg before consuming input\n" +
+      "my error msg after consuming input\n";
+  String expectedOutput = null;// inited in setUp()
+  String expectedStatus = "before consuming input";
+
+  // This script does the following
+  // (a) setting task status before reading input
+  // (b) writing to stderr before reading input and after reading input
+  // (c) writing to stdout before reading input
+  // (d) incrementing user counter before reading input and after reading input
+  // Write lines to stdout before reading input{(c) above} is to validate
+  // the hanging task issue when input to task is empty(because of not starting
+  // output thread).
+  protected String script =
+    "#!/usr/bin/perl\n" +
+    "print STDERR \"reporter:status:" + expectedStatus + "\\n\";\n" +
+    "print STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n" +
+    "print STDERR \"my error msg before consuming input\\n\";\n" +
+    "for($count = 1500; $count >= 1; $count--) {print STDOUT \"$count \";}" +
+    "while(<STDIN>) {chomp;}\n" +
+    "print STDERR \"my error msg after consuming input\\n\";\n" +
+    "print STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n";
+
+  MiniMRCluster mr = null;
+  FileSystem fs = null;
+  JobConf conf = null;
+
+  /**
+   * Start the cluster and create input file before running the actual test.
+   *
+   * @throws IOException
+   */
+  @Before
+  public void setUp() throws IOException {
+    conf = new JobConf();
+    conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+
+    mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
+
+    Path inFile = new Path(INPUT_FILE);
+    fs = inFile.getFileSystem(mr.createJobConf());
+    clean(fs);
+
+    buildExpectedJobOutput();
+  }
+
+  /**
+   * Kill the cluster after the test is done.
+   */
+  @After
+  public void tearDown() {
+    if (fs != null) { clean(fs); }
+    if (mr != null) { mr.shutdown(); }
+  }
+
+  // Updates expectedOutput to have the expected job output as a string
+  void buildExpectedJobOutput() {
+    if (expectedOutput == null) {
+      expectedOutput = "";
+      for(int i = 1500; i >= 1; i--) {
+        expectedOutput = expectedOutput.concat(Integer.toString(i) + " ");
+      }
+      expectedOutput = expectedOutput.trim();
+    }
+  }
+
+  // Create empty/nonempty input file.
+  // Create script file with the specified content.
+  protected void createInputAndScript(boolean isEmptyInput,
+      String script) throws IOException {
+    makeInput(fs, isEmptyInput ? "" : input);
+
+    // create script file
+    DataOutputStream file = fs.create(new Path(scriptFileName));
+    file.writeBytes(script);
+    file.close();
+  }
+
+  protected String[] genArgs(int jobtrackerPort, String mapper, String reducer)
+  {
     return new String[] {
       "-input", INPUT_FILE,
       "-output", OUTPUT_DIR,
-      "-mapper", map,
+      "-mapper", mapper,
+      "-reducer", reducer,
       "-jobconf", MRJobConfig.NUM_MAPS + "=1",
-      "-jobconf", MRJobConfig.NUM_REDUCES + "=0",      
+      "-jobconf", MRJobConfig.NUM_REDUCES + "=1",
       "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true",
-      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+      "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(),
       "-jobconf", JTConfig.JT_IPC_ADDRESS + "=localhost:"+jobtrackerPort,
       "-jobconf", "fs.default.name=file:///"
     };
   }
-  
-  public void makeInput(FileSystem fs) throws IOException {
+
+  // create input file with the given content
+  public void makeInput(FileSystem fs, String input) throws IOException {
     Path inFile = new Path(INPUT_FILE);
     DataOutputStream file = fs.create(inFile);
     file.writeBytes(input);
     file.close();
   }
 
-  public void clean(FileSystem fs) {
+  // Delete output directory
+  protected void deleteOutDir(FileSystem fs) {
     try {
       Path outDir = new Path(OUTPUT_DIR);
       fs.delete(outDir, true);
     } catch (Exception e) {}
+  }
+
+  // Delete input file, script file and output directory
+  public void clean(FileSystem fs) {
+    deleteOutDir(fs);
     try {
-      Path inFile = new Path(INPUT_FILE);    
-      fs.delete(inFile, false);
-    } catch (Exception e) {}
+      Path file = new Path(INPUT_FILE);
+      if (fs.exists(file)) {
+        fs.delete(file, false);
+      }
+      file = new Path(scriptFile);
+      if (fs.exists(file)) {
+        fs.delete(file, false);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
-  
+
+  /**
+   * Check if mapper/reducer with empty/nonempty input works properly if
+   * reporting is done using lines like "reporter:status:" and
+   * "reporter:counter:" before map()/reduce() method is called.
+   * Validate the task's log of STDERR if messages are written
+   * to stderr before map()/reduce() is called.
+   * Also validate job output.
+   *
+   * @throws IOException
+   */
   @Test
-  public void testStreamingStatus() throws Exception {
-    MiniMRCluster mr = null;
-    FileSystem fs = null;
-    JobConf conf = new JobConf();
-    conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
-    try {
-      mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
+  public void testReporting() throws Exception {
+    testStreamJob(false);// nonempty input
+    testStreamJob(true);// empty input
+  }
+
+  /**
+   * Run a streaming job with the given script as mapper and validate.
+   * Run another streaming job with the given script as reducer and validate.
+   *
+   * @param isEmptyInput Should the input to the script be empty ?
+   * @param script The content of the script that will run as the streaming task
+   */
+  private void testStreamJob(boolean isEmptyInput)
+      throws IOException {
+
+      createInputAndScript(isEmptyInput, script);
 
-      Path inFile = new Path(INPUT_FILE);
-      fs = inFile.getFileSystem(mr.createJobConf());
+      // Check if streaming mapper works as expected
+      map = scriptFileName;
+      reduce = "/bin/cat";
+      runStreamJob(TaskType.MAP, isEmptyInput);
+      deleteOutDir(fs);
+
+      // Check if streaming reducer works as expected.
+      map = "/bin/cat";
+      reduce = scriptFileName;
+      runStreamJob(TaskType.REDUCE, isEmptyInput);
       clean(fs);
-      makeInput(fs);
-      
-      StreamJob job = new StreamJob();
-      int failed = job.run(genArgs(mr.getJobTrackerPort()));
-      assertEquals(0, failed);
-
-      TaskReport[] reports = job.jc_.getMapTaskReports(job.jobId_);
-      assertEquals(1, reports.length);
-      assertEquals("starting echo > sort", reports[0].getState());
-    } finally {
-      if (fs != null) { clean(fs); }
-      if (mr != null) { mr.shutdown(); }
+  }
+
+  // Run streaming job for the specified input file, mapper and reducer and
+  // (1) Validate if the job succeeds.
+  // (2) Validate if user counter is incremented properly for the cases of
+  //   (a) nonempty input to map
+  //   (b) empty input to map and
+  //   (c) nonempty input to reduce
+  // (3) Validate task status for the cases of (2)(a),(2)(b),(2)(c).
+  //     Because empty input to reduce task => reporter is dummy and ignores
+  //     all "reporter:status" and "reporter:counter" lines. 
+  // (4) Validate stderr of task of given task type.
+  // (5) Validate job output
+  void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
+    boolean mayExit = false;
+    StreamJob job = new StreamJob(genArgs(
+        mr.getJobTrackerPort(), map, reduce), mayExit);
+    int returnValue = job.go();
+    assertEquals(0, returnValue);
+
+    // If input to reducer is empty, dummy reporter(which ignores all
+    // reporting lines) is set for MRErrorThread in waitOutputThreads(). So
+    // expectedCounterValue is 0 for empty-input-to-reducer case.
+    // Output of reducer is also empty for empty-input-to-reducer case.
+    int expectedCounterValue = 0;
+    if (type == TaskType.MAP || !isEmptyInput) {
+      validateTaskStatus(job, type);
+      // output is from "print STDOUT" statements in perl script
+      validateJobOutput(job.getConf());
+      expectedCounterValue = 2;
+    }
+    validateUserCounter(job, expectedCounterValue);
+    validateTaskStderr(job, type);
+
+    deleteOutDir(fs);
+  }
+
+  // validate task status of task of given type(validates 1st task of that type)
+  void validateTaskStatus(StreamJob job, TaskType type) throws IOException {
+    // Map Task has 2 phases: map, sort
+    // Reduce Task has 3 phases: copy, sort, reduce
+    String finalPhaseInTask;
+    TaskReport[] reports;
+    if (type == TaskType.MAP) {
+      reports = job.jc_.getMapTaskReports(job.jobId_);
+      finalPhaseInTask = "sort";
+    } else {// reduce task
+      reports = job.jc_.getReduceTaskReports(job.jobId_);
+      finalPhaseInTask = "reduce";
     }
+    assertEquals(1, reports.length);
+    assertEquals(expectedStatus + " > " + finalPhaseInTask,
+        reports[0].getState());
   }
+
+  // Validate the job output
+  void validateJobOutput(Configuration conf)
+      throws IOException {
+
+    String output = MapReduceTestUtil.readOutput(
+        new Path(OUTPUT_DIR), conf).trim();
+
+    assertTrue(output.equals(expectedOutput));
+  }
+
+  // Validate stderr task log of given task type(validates 1st
+  // task of that type).
+  void validateTaskStderr(StreamJob job, TaskType type)
+      throws IOException {
+    TaskAttemptID attemptId =
+        new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
+
+    String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
+        attemptId, false);
+
+    // trim() is called on expectedStderr here because the method
+    // MapReduceTestUtil.readTaskLog() returns trimmed String.
+    assertTrue(log.equals(expectedStderr.trim()));
+  }
+
+  // Validate if user counter is incremented properly
+  void validateUserCounter(StreamJob job, int expectedCounterValue)
+      throws IOException {
+    Counters counters = job.running_.getCounters();
+    assertEquals(expectedCounterValue, counters.findCounter(
+        "myOwnCounterGroup", "myOwnCounter").getValue());
+  }
+
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Fri Jun 11 11:36:23 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.util.Shell;
 
 import org.junit.Test;
@@ -131,7 +131,7 @@ public class TestStreamingTaskLog {
     assertEquals("StreamJob failed.", 0, returnStatus);
     
     // validate environment variables set for the child(script) of java process
-    String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
+    String env = MapReduceTestUtil.readOutput(outputPath, mr.createJobConf());
     long logSize = USERLOG_LIMIT_KB * 1024;
     assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
                && env.contains("-Dhadoop.tasklog.taskid=attempt_")

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Fri Jun 11 11:36:23 2010
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.StringUtils;
 
@@ -122,7 +122,7 @@ public class TestUlimit {
     boolean mayExit = false;
     StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
     job.go();
-    String output = TestMiniMRWithDFS.readOutput(outputPath,
+    String output = MapReduceTestUtil.readOutput(outputPath,
                                         mr.createJobConf());
     assertEquals("output is wrong", SET_MEMORY_LIMIT,
                                     output.trim());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Fri Jun 11 11:36:23 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -217,7 +218,7 @@ public abstract class NotificationTestCa
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
     JobClient.runJob(conf);
-    return TestMiniMRWithDFS.readOutput(outDir, conf);
+    return MapReduceTestUtil.readOutput(outDir, conf);
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Fri Jun 11 11:36:23 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
 import org.apache.hadoop.mapreduce.lib.fieldsel.TestMRFieldSelection;
 
@@ -86,7 +87,7 @@ private static NumberFormat idFormat = N
     //
     boolean success = true;
     Path outPath = new Path(OUTPUT_DIR, "part-00000");
-    String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+    String outdata = MapReduceTestUtil.readOutput(outPath,job);
 
     assertEquals(expectedOutput.toString(),outdata);
     fs.delete(OUTPUT_DIR, true);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Fri Jun 11 11:36:23 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
@@ -93,7 +94,7 @@ public class TestJobSysDirWithDFS extend
     // Check if the Job Tracker system dir is propogated to client
     assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
     assertTrue(sysDir.contains("custom"));
-    return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
+    return new TestResult(job, MapReduceTestUtil.readOutput(outDir, conf));
   }
 
  static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir) 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Jun 11 11:36:23 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.MRCaching.TestResult;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TestMapReduceLocal;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -125,7 +126,7 @@ public class TestMiniMRLocalFS extends T
     try {
       JobClient.runJob(job);
       String result = 
-        TestMiniMRWithDFS.readOutput(outDir, job);
+        MapReduceTestUtil.readOutput(outDir, job);
       assertEquals("output", ("aunt annie\t1\n" +
                               "bumble boat\t4\n" +
                               "crocodile pants\t0\n" +

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Jun 11 11:36:23 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -97,32 +98,7 @@ public class TestMiniMRWithDFS extends T
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
     RunningJob job = JobClient.runJob(conf);
-    return new TestResult(job, readOutput(outDir, conf));
-  }
-
-  public static String readOutput(Path outDir, 
-                                  JobConf conf) throws IOException {
-    FileSystem fs = outDir.getFileSystem(conf);
-    StringBuffer result = new StringBuffer();
-    {
-      
-      Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new Utils.OutputFileUtils
-                                            .OutputFilesFilter()));
-      for(int i=0; i < fileList.length; ++i) {
-        LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
-        BufferedReader file = 
-          new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
-        String line = file.readLine();
-        while (line != null) {
-          result.append(line);
-          result.append("\n");
-          line = file.readLine();
-        }
-        file.close();
-      }
-    }
-    return result.toString();
+    return new TestResult(job, MapReduceTestUtil.readOutput(outDir, conf));
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java Fri Jun 11 11:36:23 2010
@@ -21,6 +21,8 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.*;
@@ -107,7 +109,7 @@ public class TestAggregates extends Test
     //
     boolean success = true;
     Path outPath = new Path(OUTPUT_DIR, "part-00000");
-    String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+    String outdata = MapReduceTestUtil.readOutput(outPath,job);
     System.out.println("full out data:");
     System.out.println(outdata.toString());
     outdata = outdata.substring(0, expectedOutput.toString().length());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java Fri Jun 11 11:36:23 2010
@@ -38,9 +38,9 @@ import org.apache.hadoop.mapred.FileOutp
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
 import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -199,7 +199,7 @@ public class TestPipes extends TestCase 
     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
     		                        new Utils.OutputFileUtils
     		                                 .OutputFilesFilter()))) {
-      results.add(TestMiniMRWithDFS.readOutput(p, job));
+      results.add(MapReduceTestUtil.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 
                  expectedResults.length, results.size());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jun 11 11:36:23 2010
@@ -396,7 +396,8 @@ public class MapReduceTestUtil {
       }
     };
   }
-  
+
+  // Return output of MR job by reading from the given output directory
   public static String readOutput(Path outDir, Configuration conf) 
       throws IOException {
     FileSystem fs = outDir.getFileSystem(conf);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java Fri Jun 11 11:36:23 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -112,7 +113,7 @@ public class TestMapReduceAggregates ext
     // original one.  Remember, we need to ignore zero-count items
     // in the original key.
     //
-    String outdata = readOutput(OUTPUT_DIR, conf);
+    String outdata = MapReduceTestUtil.readOutput(OUTPUT_DIR, conf);
     System.out.println("full out data:");
     System.out.println(outdata.toString());
     outdata = outdata.substring(0, expectedOutput.toString().length());
@@ -121,26 +122,6 @@ public class TestMapReduceAggregates ext
     fs.delete(OUTPUT_DIR, true);
     fs.delete(INPUT_DIR, true);
   }
-
-  public static String readOutput(Path outDir, Configuration conf) 
-    throws IOException {
-    FileSystem fs = outDir.getFileSystem(conf);
-    StringBuffer result = new StringBuffer();
-    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                        new Utils.OutputFileUtils.OutputFilesFilter()));
-    for(int i=0; i < fileList.length; ++i) {
-      BufferedReader file = 
-        new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
-      String line = file.readLine();
-      while (line != null) {
-        result.append(line);
-        result.append("\n");
-        line = file.readLine();
-      }
-      file.close();
-    }
-    return result.toString();
-  }
   
   /**
    * Launches all the tasks in order.