You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/11 13:36:24 UTC
svn commit: r953670 - in /hadoop/mapreduce/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/map...
Author: vinodkv
Date: Fri Jun 11 11:36:23 2010
New Revision: 953670
URL: http://svn.apache.org/viewvc?rev=953670&view=rev
Log:
MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. Contributed by Ravi Gummadi.
Removed:
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun 11 11:36:23 2010
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
MAPREDUCE-1505. Create RPC client on job submission, not in cstr of Job
instance. (Dick King via cdouglas)
+ MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. (Ravi Gummadi via vinodkv)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jun 11 11:36:23 2010
@@ -222,8 +222,6 @@ public abstract class PipeMapRed {
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
startTime_ = System.currentTimeMillis();
- errThread_ = new MRErrorThread();
- errThread_.start();
} catch (IOException e) {
logStackTrace(e);
LOG.error("configuration exception", e);
@@ -338,7 +336,9 @@ public abstract class PipeMapRed {
outReader_ = createOutputReader();
outThread_ = new MROutputThread(outReader_, output, reporter);
outThread_.start();
+ errThread_ = new MRErrorThread();
errThread_.setReporter(reporter);
+ errThread_.start();
}
void waitOutputThreads() throws IOException {
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java Fri Jun 11 11:36:23 2010
@@ -25,8 +25,6 @@ import org.apache.hadoop.mapred.OutputCo
import java.io.IOException;
-import org.apache.hadoop.util.ReflectionUtils;
-
public class PipeMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java Fri Jun 11 11:36:23 2010
@@ -22,88 +22,303 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.File;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
/**
- * Tests for the ability of a streaming task to set the status
- * by writing "reporter:status:" lines to stderr. Uses MiniMR
- * since the local jobtracker doesn't track status.
+ * Tests if mapper/reducer with empty/nonempty input works properly if
+ * reporting is done using lines like "reporter:status:" and
+ * "reporter:counter:" before map()/reduce() method is called.
+ * Validates the task's log of STDERR if messages are written to stderr before
+ * map()/reduce() is called.
+ * Also validates job output.
+ * Uses MiniMR since the local jobtracker doesn't track task status.
*/
public class TestStreamingStatus {
- private static String TEST_ROOT_DIR =
- new File(System.getProperty("test.build.data","/tmp"))
+ protected static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data","/tmp"),
+ TestStreamingStatus.class.getSimpleName())
.toURI().toString().replace(' ', '+');
protected String INPUT_FILE = TEST_ROOT_DIR + "/input.txt";
protected String OUTPUT_DIR = TEST_ROOT_DIR + "/out";
protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
- protected String map = StreamUtil.makeJavaCommand(StderrApp.class, new String[]{"3", "0", "0", "true"});
+ protected String map = null;
+ protected String reduce = null;
- protected String[] genArgs(int jobtrackerPort) {
+ protected String scriptFile = TEST_ROOT_DIR + "/perlScript.pl";
+ protected String scriptFileName = new Path(scriptFile).toUri().getPath();
+
+
+ String expectedStderr = "my error msg before consuming input\n" +
+ "my error msg after consuming input\n";
+ String expectedOutput = null;// inited in setUp()
+ String expectedStatus = "before consuming input";
+
+ // This script does the following
+ // (a) setting task status before reading input
+ // (b) writing to stderr before reading input and after reading input
+ // (c) writing to stdout before reading input
+ // (d) incrementing user counter before reading input and after reading input
+ // Write lines to stdout before reading input{(c) above} is to validate
+ // the hanging task issue when input to task is empty(because of not starting
+ // output thread).
+ protected String script =
+ "#!/usr/bin/perl\n" +
+ "print STDERR \"reporter:status:" + expectedStatus + "\\n\";\n" +
+ "print STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n" +
+ "print STDERR \"my error msg before consuming input\\n\";\n" +
+ "for($count = 1500; $count >= 1; $count--) {print STDOUT \"$count \";}" +
+ "while(<STDIN>) {chomp;}\n" +
+ "print STDERR \"my error msg after consuming input\\n\";\n" +
+ "print STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n";
+
+ MiniMRCluster mr = null;
+ FileSystem fs = null;
+ JobConf conf = null;
+
+ /**
+ * Start the cluster and create input file before running the actual test.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void setUp() throws IOException {
+ conf = new JobConf();
+ conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+
+ mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
+
+ Path inFile = new Path(INPUT_FILE);
+ fs = inFile.getFileSystem(mr.createJobConf());
+ clean(fs);
+
+ buildExpectedJobOutput();
+ }
+
+ /**
+ * Kill the cluster after the test is done.
+ */
+ @After
+ public void tearDown() {
+ if (fs != null) { clean(fs); }
+ if (mr != null) { mr.shutdown(); }
+ }
+
+ // Updates expectedOutput to have the expected job output as a string
+ void buildExpectedJobOutput() {
+ if (expectedOutput == null) {
+ expectedOutput = "";
+ for(int i = 1500; i >= 1; i--) {
+ expectedOutput = expectedOutput.concat(Integer.toString(i) + " ");
+ }
+ expectedOutput = expectedOutput.trim();
+ }
+ }
+
+ // Create empty/nonempty input file.
+ // Create script file with the specified content.
+ protected void createInputAndScript(boolean isEmptyInput,
+ String script) throws IOException {
+ makeInput(fs, isEmptyInput ? "" : input);
+
+ // create script file
+ DataOutputStream file = fs.create(new Path(scriptFileName));
+ file.writeBytes(script);
+ file.close();
+ }
+
+ protected String[] genArgs(int jobtrackerPort, String mapper, String reducer)
+ {
return new String[] {
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
- "-mapper", map,
+ "-mapper", mapper,
+ "-reducer", reducer,
"-jobconf", MRJobConfig.NUM_MAPS + "=1",
- "-jobconf", MRJobConfig.NUM_REDUCES + "=0",
+ "-jobconf", MRJobConfig.NUM_REDUCES + "=1",
"-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true",
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+ "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(),
"-jobconf", JTConfig.JT_IPC_ADDRESS + "=localhost:"+jobtrackerPort,
"-jobconf", "fs.default.name=file:///"
};
}
-
- public void makeInput(FileSystem fs) throws IOException {
+
+ // create input file with the given content
+ public void makeInput(FileSystem fs, String input) throws IOException {
Path inFile = new Path(INPUT_FILE);
DataOutputStream file = fs.create(inFile);
file.writeBytes(input);
file.close();
}
- public void clean(FileSystem fs) {
+ // Delete output directory
+ protected void deleteOutDir(FileSystem fs) {
try {
Path outDir = new Path(OUTPUT_DIR);
fs.delete(outDir, true);
} catch (Exception e) {}
+ }
+
+ // Delete input file, script file and output directory
+ public void clean(FileSystem fs) {
+ deleteOutDir(fs);
try {
- Path inFile = new Path(INPUT_FILE);
- fs.delete(inFile, false);
- } catch (Exception e) {}
+ Path file = new Path(INPUT_FILE);
+ if (fs.exists(file)) {
+ fs.delete(file, false);
+ }
+ file = new Path(scriptFile);
+ if (fs.exists(file)) {
+ fs.delete(file, false);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
-
+
+ /**
+ * Check if mapper/reducer with empty/nonempty input works properly if
+ * reporting is done using lines like "reporter:status:" and
+ * "reporter:counter:" before map()/reduce() method is called.
+ * Validate the task's log of STDERR if messages are written
+ * to stderr before map()/reduce() is called.
+ * Also validate job output.
+ *
+ * @throws IOException
+ */
@Test
- public void testStreamingStatus() throws Exception {
- MiniMRCluster mr = null;
- FileSystem fs = null;
- JobConf conf = new JobConf();
- conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
- try {
- mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
+ public void testReporting() throws Exception {
+ testStreamJob(false);// nonempty input
+ testStreamJob(true);// empty input
+ }
+
+ /**
+ * Run a streaming job with the given script as mapper and validate.
+ * Run another streaming job with the given script as reducer and validate.
+ *
+ * @param isEmptyInput Should the input to the script be empty ?
+ * @param script The content of the script that will run as the streaming task
+ */
+ private void testStreamJob(boolean isEmptyInput)
+ throws IOException {
+
+ createInputAndScript(isEmptyInput, script);
- Path inFile = new Path(INPUT_FILE);
- fs = inFile.getFileSystem(mr.createJobConf());
+ // Check if streaming mapper works as expected
+ map = scriptFileName;
+ reduce = "/bin/cat";
+ runStreamJob(TaskType.MAP, isEmptyInput);
+ deleteOutDir(fs);
+
+ // Check if streaming reducer works as expected.
+ map = "/bin/cat";
+ reduce = scriptFileName;
+ runStreamJob(TaskType.REDUCE, isEmptyInput);
clean(fs);
- makeInput(fs);
-
- StreamJob job = new StreamJob();
- int failed = job.run(genArgs(mr.getJobTrackerPort()));
- assertEquals(0, failed);
-
- TaskReport[] reports = job.jc_.getMapTaskReports(job.jobId_);
- assertEquals(1, reports.length);
- assertEquals("starting echo > sort", reports[0].getState());
- } finally {
- if (fs != null) { clean(fs); }
- if (mr != null) { mr.shutdown(); }
+ }
+
+ // Run streaming job for the specified input file, mapper and reducer and
+ // (1) Validate if the job succeeds.
+ // (2) Validate if user counter is incremented properly for the cases of
+ // (a) nonempty input to map
+ // (b) empty input to map and
+ // (c) nonempty input to reduce
+ // (3) Validate task status for the cases of (2)(a),(2)(b),(2)(c).
+ // Because empty input to reduce task => reporter is dummy and ignores
+ // all "reporter:status" and "reporter:counter" lines.
+ // (4) Validate stderr of task of given task type.
+ // (5) Validate job output
+ void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
+ boolean mayExit = false;
+ StreamJob job = new StreamJob(genArgs(
+ mr.getJobTrackerPort(), map, reduce), mayExit);
+ int returnValue = job.go();
+ assertEquals(0, returnValue);
+
+ // If input to reducer is empty, dummy reporter(which ignores all
+ // reporting lines) is set for MRErrorThread in waitOutputThreads(). So
+ // expectedCounterValue is 0 for empty-input-to-reducer case.
+ // Output of reducer is also empty for empty-input-to-reducer case.
+ int expectedCounterValue = 0;
+ if (type == TaskType.MAP || !isEmptyInput) {
+ validateTaskStatus(job, type);
+ // output is from "print STDOUT" statements in perl script
+ validateJobOutput(job.getConf());
+ expectedCounterValue = 2;
+ }
+ validateUserCounter(job, expectedCounterValue);
+ validateTaskStderr(job, type);
+
+ deleteOutDir(fs);
+ }
+
+ // validate task status of task of given type(validates 1st task of that type)
+ void validateTaskStatus(StreamJob job, TaskType type) throws IOException {
+ // Map Task has 2 phases: map, sort
+ // Reduce Task has 3 phases: copy, sort, reduce
+ String finalPhaseInTask;
+ TaskReport[] reports;
+ if (type == TaskType.MAP) {
+ reports = job.jc_.getMapTaskReports(job.jobId_);
+ finalPhaseInTask = "sort";
+ } else {// reduce task
+ reports = job.jc_.getReduceTaskReports(job.jobId_);
+ finalPhaseInTask = "reduce";
}
+ assertEquals(1, reports.length);
+ assertEquals(expectedStatus + " > " + finalPhaseInTask,
+ reports[0].getState());
}
+
+ // Validate the job output
+ void validateJobOutput(Configuration conf)
+ throws IOException {
+
+ String output = MapReduceTestUtil.readOutput(
+ new Path(OUTPUT_DIR), conf).trim();
+
+ assertTrue(output.equals(expectedOutput));
+ }
+
+ // Validate stderr task log of given task type(validates 1st
+ // task of that type).
+ void validateTaskStderr(StreamJob job, TaskType type)
+ throws IOException {
+ TaskAttemptID attemptId =
+ new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
+
+ String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
+ attemptId, false);
+
+ // trim() is called on expectedStderr here because the method
+ // MapReduceTestUtil.readTaskLog() returns trimmed String.
+ assertTrue(log.equals(expectedStderr.trim()));
+ }
+
+ // Validate if user counter is incremented properly
+ void validateUserCounter(StreamJob job, int expectedCounterValue)
+ throws IOException {
+ Counters counters = job.running_.getCounters();
+ assertEquals(expectedCounterValue, counters.findCounter(
+ "myOwnCounterGroup", "myOwnCounter").getValue());
+ }
+
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Fri Jun 11 11:36:23 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.util.Shell;
import org.junit.Test;
@@ -131,7 +131,7 @@ public class TestStreamingTaskLog {
assertEquals("StreamJob failed.", 0, returnStatus);
// validate environment variables set for the child(script) of java process
- String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
+ String env = MapReduceTestUtil.readOutput(outputPath, mr.createJobConf());
long logSize = USERLOG_LIMIT_KB * 1024;
assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
&& env.contains("-Dhadoop.tasklog.taskid=attempt_")
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Fri Jun 11 11:36:23 2010
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.StringUtils;
@@ -122,7 +122,7 @@ public class TestUlimit {
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
job.go();
- String output = TestMiniMRWithDFS.readOutput(outputPath,
+ String output = MapReduceTestUtil.readOutput(outputPath,
mr.createJobConf());
assertEquals("output is wrong", SET_MEMORY_LIMIT,
output.trim());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Fri Jun 11 11:36:23 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -217,7 +218,7 @@ public abstract class NotificationTestCa
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
JobClient.runJob(conf);
- return TestMiniMRWithDFS.readOutput(outDir, conf);
+ return MapReduceTestUtil.readOutput(outDir, conf);
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Fri Jun 11 11:36:23 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.TestMRFieldSelection;
@@ -86,7 +87,7 @@ private static NumberFormat idFormat = N
//
boolean success = true;
Path outPath = new Path(OUTPUT_DIR, "part-00000");
- String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+ String outdata = MapReduceTestUtil.readOutput(outPath,job);
assertEquals(expectedOutput.toString(),outdata);
fs.delete(OUTPUT_DIR, true);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Fri Jun 11 11:36:23 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
/**
@@ -93,7 +94,7 @@ public class TestJobSysDirWithDFS extend
// Check if the Job Tracker system dir is propogated to client
assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
assertTrue(sysDir.contains("custom"));
- return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
+ return new TestResult(job, MapReduceTestUtil.readOutput(outDir, conf));
}
static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir)
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Jun 11 11:36:23 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.MRCaching.TestResult;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TestMapReduceLocal;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -125,7 +126,7 @@ public class TestMiniMRLocalFS extends T
try {
JobClient.runJob(job);
String result =
- TestMiniMRWithDFS.readOutput(outDir, job);
+ MapReduceTestUtil.readOutput(outDir, job);
assertEquals("output", ("aunt annie\t1\n" +
"bumble boat\t4\n" +
"crocodile pants\t0\n" +
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Jun 11 11:36:23 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
@@ -97,32 +98,7 @@ public class TestMiniMRWithDFS extends T
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
RunningJob job = JobClient.runJob(conf);
- return new TestResult(job, readOutput(outDir, conf));
- }
-
- public static String readOutput(Path outDir,
- JobConf conf) throws IOException {
- FileSystem fs = outDir.getFileSystem(conf);
- StringBuffer result = new StringBuffer();
- {
-
- Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
- new Utils.OutputFileUtils
- .OutputFilesFilter()));
- for(int i=0; i < fileList.length; ++i) {
- LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
- BufferedReader file =
- new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
- String line = file.readLine();
- while (line != null) {
- result.append(line);
- result.append("\n");
- line = file.readLine();
- }
- file.close();
- }
- }
- return result.toString();
+ return new TestResult(job, MapReduceTestUtil.readOutput(outDir, conf));
}
/**
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java Fri Jun 11 11:36:23 2010
@@ -21,6 +21,8 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
import junit.framework.TestCase;
import java.io.*;
import java.util.*;
@@ -107,7 +109,7 @@ public class TestAggregates extends Test
//
boolean success = true;
Path outPath = new Path(OUTPUT_DIR, "part-00000");
- String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+ String outdata = MapReduceTestUtil.readOutput(outPath,job);
System.out.println("full out data:");
System.out.println(outdata.toString());
outdata = outdata.substring(0, expectedOutput.toString().length());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java Fri Jun 11 11:36:23 2010
@@ -38,9 +38,9 @@ import org.apache.hadoop.mapred.FileOutp
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TestMiniMRWithDFS;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -199,7 +199,7 @@ public class TestPipes extends TestCase
for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
new Utils.OutputFileUtils
.OutputFilesFilter()))) {
- results.add(TestMiniMRWithDFS.readOutput(p, job));
+ results.add(MapReduceTestUtil.readOutput(p, job));
}
assertEquals("number of reduces is wrong",
expectedResults.length, results.size());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jun 11 11:36:23 2010
@@ -396,7 +396,8 @@ public class MapReduceTestUtil {
}
};
}
-
+
+ // Return output of MR job by reading from the given output directory
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java?rev=953670&r1=953669&r2=953670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java Fri Jun 11 11:36:23 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -112,7 +113,7 @@ public class TestMapReduceAggregates ext
// original one. Remember, we need to ignore zero-count items
// in the original key.
//
- String outdata = readOutput(OUTPUT_DIR, conf);
+ String outdata = MapReduceTestUtil.readOutput(OUTPUT_DIR, conf);
System.out.println("full out data:");
System.out.println(outdata.toString());
outdata = outdata.substring(0, expectedOutput.toString().length());
@@ -121,26 +122,6 @@ public class TestMapReduceAggregates ext
fs.delete(OUTPUT_DIR, true);
fs.delete(INPUT_DIR, true);
}
-
- public static String readOutput(Path outDir, Configuration conf)
- throws IOException {
- FileSystem fs = outDir.getFileSystem(conf);
- StringBuffer result = new StringBuffer();
- Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
- new Utils.OutputFileUtils.OutputFilesFilter()));
- for(int i=0; i < fileList.length; ++i) {
- BufferedReader file =
- new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
- String line = file.readLine();
- while (line != null) {
- result.append(line);
- result.append("\n");
- line = file.readLine();
- }
- file.close();
- }
- return result.toString();
- }
/**
* Launches all the tasks in order.