You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/05 07:09:54 UTC
svn commit: r960446 - in /hadoop/mapreduce/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/test/mapred/org/apache/hadoop/mapred/
Author: amareshwari
Date: Mon Jul 5 05:09:53 2010
New Revision: 960446
URL: http://svn.apache.org/viewvc?rev=960446&view=rev
Log:
MAPREDUCE-1888. Fixes Streaming to override output key and value types, only if mapper/reducer is a command. Contributed by Ravi Gummadi
Added:
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 5 05:09:53 2010
@@ -137,6 +137,9 @@ Trunk (unreleased changes)
MAPREDUCE-1864. Removes uninitialized/unused variables in
org.apache.hadoop.streaming.PipeMapRed. (amareshwari)
+ MAPREDUCE-1888. Fixes Streaming to override output key and value types,
+ only if mapper/reducer is a command. (Ravi Gummadi via amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Jul 5 05:09:53 2010
@@ -744,25 +744,15 @@ public class StreamJob implements Tool {
jobConf_.setClass("stream.reduce.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
- idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
- jobConf_.setClass("stream.map.output.reader.class",
- idResolver.getOutputReaderClass(), OutputReader.class);
- jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
- jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
-
- idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
- jobConf_.setClass("stream.reduce.output.reader.class",
- idResolver.getOutputReaderClass(), OutputReader.class);
- jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
- jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
-
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
+ boolean isMapperACommand = false;
if (mapCmd_ != null) {
c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
if (c != null) {
jobConf_.setMapperClass(c);
} else {
+ isMapperACommand = true;
jobConf_.setMapperClass(PipeMapper.class);
jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
@@ -781,25 +771,62 @@ public class StreamJob implements Tool {
}
}
- boolean reducerNone_ = false;
- if (redCmd_ != null) {
- reducerNone_ = redCmd_.equals(REDUCE_NONE);
- if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
- jobConf_.setReducerClass(ValueAggregatorReducer.class);
- jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
- } else {
+ if (numReduceTasksSpec_!= null) {
+ int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
+ jobConf_.setNumReduceTasks(numReduceTasks);
+ }
- c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
- if (c != null) {
- jobConf_.setReducerClass(c);
+ boolean isReducerACommand = false;
+ if (redCmd_ != null) {
+ if (redCmd_.equals(REDUCE_NONE)) {
+ jobConf_.setNumReduceTasks(0);
+ }
+ if (jobConf_.getNumReduceTasks() != 0) {
+ if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+ jobConf_.setReducerClass(ValueAggregatorReducer.class);
+ jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
} else {
- jobConf_.setReducerClass(PipeReducer.class);
- jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
- redCmd_, "UTF-8"));
+
+ c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
+ if (c != null) {
+ jobConf_.setReducerClass(c);
+ } else {
+ isReducerACommand = true;
+ jobConf_.setReducerClass(PipeReducer.class);
+ jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+ redCmd_, "UTF-8"));
+ }
}
}
}
+ idResolver.resolve(jobConf_.get("stream.map.output",
+ IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ if (isMapperACommand) {
+ // if mapper is a command, then map output key/value classes come from the
+ // idResolver
+ jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+
+ if (jobConf_.getNumReduceTasks() == 0) {
+ jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+ }
+ }
+
+ idResolver.resolve(jobConf_.get("stream.reduce.output",
+ IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ if (isReducerACommand) {
+ // if reducer is a command, then output key/value classes come from the
+ // idResolver
+ jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+ }
+
if (inReaderSpec_ != null) {
String[] args = inReaderSpec_.split(",");
String readerClass = args[0];
@@ -846,14 +873,6 @@ public class StreamJob implements Tool {
}
}
- if (numReduceTasksSpec_!= null) {
- int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
- jobConf_.setNumReduceTasks(numReduceTasks);
- }
- if (reducerNone_) {
- jobConf_.setNumReduceTasks(0);
- }
-
if(mapDebugSpec_ != null){
jobConf_.setMapDebugScript(mapDebugSpec_);
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java Mon Jul 5 05:09:53 2010
@@ -21,13 +21,14 @@ package org.apache.hadoop.streaming;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
-import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.After;
+import org.junit.Before;
/**
* This class tests that the '-file' argument to streaming results
@@ -59,8 +60,13 @@ public class TestFileArgs extends TestSt
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
strNamenode = "fs.default.name=hdfs://" + namenode;
+ map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
+ }
+ @Before
+ @Override
+ public void setUp() throws IOException {
// Set up side file
FileSystem localFs = FileSystem.getLocal(conf);
DataOutputStream dos = localFs.create(new Path("sidefile"));
@@ -68,6 +74,17 @@ public class TestFileArgs extends TestSt
dos.close();
}
+ @After
+ @Override
+ public void tearDown() {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+
@Override
protected String getExpectedOutput() {
return EXPECTED_OUTPUT;
@@ -80,22 +97,14 @@ public class TestFileArgs extends TestSt
@Override
protected String[] genArgs() {
- return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-file", new java.io.File("sidefile").getAbsolutePath(),
- "-mapper", LS_PATH,
- "-numReduceTasks", "0",
- "-jobconf", strNamenode,
- "-jobconf", strJobTracker,
- "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
- };
+ args.add("-file");
+ args.add(new java.io.File("sidefile").getAbsolutePath());
+ args.add("-numReduceTasks");
+ args.add("0");
+ args.add("-jobconf");
+ args.add(strNamenode);
+ args.add("-jobconf");
+ args.add(strJobTracker);
+ return super.genArgs();
}
-
-
- public static void main(String[]args) throws Exception
- {
- new TestFileArgs().testCommandLine();
- }
-
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Mon Jul 5 05:09:53 2010
@@ -40,21 +40,4 @@ public class TestGzipInput extends TestS
out.write(input.getBytes("UTF-8"));
out.close();
}
-
-
- protected String[] genArgs() {
- return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-reducer", reduce,
- };
-
- }
-
- public static void main(String[]args) throws Exception
- {
- new TestGzipInput().testCommandLine();
- }
-
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java Mon Jul 5 05:09:53 2010
@@ -82,6 +82,15 @@ public class TestMultipleArchiveFiles ex
mr = new MiniMRCluster(1, namenode, 3);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
strNamenode = "fs.default.name=" + namenode;
+
+ map = "xargs cat";
+ reduce = "cat";
+ }
+
+ @Override
+ protected void setInputOutput() {
+ inputFile = INPUT_FILE;
+ outDir = OUTPUT_DIR;
}
protected void createInput() throws IOException
@@ -114,30 +123,20 @@ public class TestMultipleArchiveFiles ex
String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
- return new String[] {
- "-input", INPUT_FILE.toString(),
- "-output", OUTPUT_DIR,
- "-mapper", "xargs cat",
- "-reducer", "cat",
- "-jobconf", "mapreduce.job.reduces=1",
- "-cacheArchive", cache1,
- "-cacheArchive", cache2,
- "-jobconf", strNamenode,
- "-jobconf", strJobTracker,
- "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
- };
+ args.add("-jobconf");
+ args.add("mapreduce.job.reduces=1");
+ args.add("-cacheArchive");
+ args.add(cache1);
+ args.add("-cacheArchive");
+ args.add(cache2);
+ args.add("-jobconf");
+ args.add(strNamenode);
+ args.add("-jobconf");
+ args.add(strJobTracker);
+ return super.genArgs();
}
- //@Test
- public void testCommandLine() throws Exception {
- createInput();
- String args[] = genArgs();
- LOG.info("Testing streaming command line:\n" +
- StringUtils.join(" ", Arrays.asList(args)));
- job = new StreamJob(genArgs(), true);
- if(job.go() != 0) {
- throw new Exception("Job Failed");
- }
+ protected void checkOutput() throws IOException {
StringBuffer output = new StringBuffer(256);
Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
new Path(OUTPUT_DIR)));
@@ -147,9 +146,4 @@ public class TestMultipleArchiveFiles ex
}
assertEquals(expectedOutput, output.toString());
}
-
- public static void main(String[]args) throws Exception
- {
- new TestMultipleArchiveFiles().testCommandLine();
- }
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java Mon Jul 5 05:09:53 2010
@@ -22,26 +22,23 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import org.apache.hadoop.fs.FileUtil;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
/**
* This class tests StreamXmlRecordReader
* The test creates an XML file, uses StreamXmlRecordReader and compares
* the expected output against the generated output
*/
-public class TestStreamXmlRecordReader extends TestStreaming
-{
-
- private StreamJob job;
+public class TestStreamXmlRecordReader extends TestStreaming {
public TestStreamXmlRecordReader() throws IOException {
INPUT_FILE = new File("input.xml");
- input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\nbunnies.are.pink\t\n</xmltag>\t\n";
+ input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\n" +
+ "bunnies.are.pink\t\n</xmltag>\t\n";
+ map = "cat";
+ reduce = "NONE";
+ outputExpect = input;
}
-
+
+ @Override
protected void createInput() throws IOException
{
FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
@@ -53,42 +50,10 @@ public class TestStreamXmlRecordReader e
out.close();
}
+ @Override
protected String[] genArgs() {
- return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper","cat",
- "-reducer", "NONE",
- "-inputreader", "StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>"
- };
- }
-
- @Test
- public void testCommandLine() throws Exception {
- try {
- try {
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (Exception e) {
- }
- createInput();
- job = new StreamJob(genArgs(), false);
- job.go();
- File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
- String output = StreamUtil.slurp(outFile);
- outFile.delete();
- assertEquals(input, output);
- } finally {
- try {
- INPUT_FILE.delete();
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String[]args) throws Exception
- {
- new TestStreamXmlRecordReader().testCommandLine();
+ args.add("-inputreader");
+ args.add("StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>");
+ return super.genArgs();
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Jul 5 05:09:53 2010
@@ -19,11 +19,13 @@
package org.apache.hadoop.streaming;
import java.io.*;
+import java.util.ArrayList;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,8 @@ public class TestStreaming
protected File TEST_DIR;
protected File INPUT_FILE;
protected File OUTPUT_DIR;
+ protected String inputFile;
+ protected String outDir;
protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
// map behaves like "/usr/bin/tr . \\n"; (split words into lines)
protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
@@ -48,6 +52,7 @@ public class TestStreaming
protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
+ protected ArrayList<String> args = new ArrayList<String>();
protected StreamJob job;
public TestStreaming() throws IOException
@@ -60,6 +65,17 @@ public class TestStreaming
INPUT_FILE = new File(TEST_DIR, "input.txt");
}
+ @Before
+ public void setUp() throws IOException {
+ UtilTest.recursiveDelete(TEST_DIR);
+ assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+ }
+
+ @After
+ public void tearDown() {
+ UtilTest.recursiveDelete(TEST_DIR);
+ }
+
protected String getInputData() {
return input;
}
@@ -72,15 +88,25 @@ public class TestStreaming
out.close();
}
+ protected void setInputOutput() {
+ inputFile = INPUT_FILE.getAbsolutePath();
+ outDir = OUTPUT_DIR.getAbsolutePath();
+ }
+
protected String[] genArgs() {
- return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-reducer", reduce,
- "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
- };
+ setInputOutput();
+ args.add("-input");args.add(inputFile);
+ args.add("-output");args.add(outDir);
+ args.add("-mapper");args.add(map);
+ args.add("-reducer");args.add(reduce);
+ args.add("-jobconf");
+ args.add("mapreduce.task.files.preserve.failedtasks=true");
+ args.add("-jobconf");
+ args.add("stream.tmpdir="+System.getProperty("test.build.data","/tmp"));
+
+ String str[] = new String [args.size()];
+ args.toArray(str);
+ return str;
}
protected Configuration getConf() {
@@ -105,25 +131,26 @@ public class TestStreaming
assertEquals(getExpectedOutput(), output);
}
- @Test
- public void testCommandLine() throws Exception
- {
- UtilTest.recursiveDelete(TEST_DIR);
- assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+ /**
+ * Runs a streaming job with the given arguments
+ * @return the streaming job return status
+ * @throws IOException
+ */
+ protected int runStreamJob() throws IOException {
createInput();
boolean mayExit = false;
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
job = new StreamJob(genArgs(), mayExit);
- int ret = job.go();
- assertEquals(0, ret);
- checkOutput();
+ return job.go();
}
- public static void main(String[]args) throws Exception
+ @Test
+ public void testCommandLine() throws Exception
{
- new TestStreaming().testCommandLine();
+ int ret = runStreamJob();
+ assertEquals(0, ret);
+ checkOutput();
}
-
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java Mon Jul 5 05:09:53 2010
@@ -27,25 +27,21 @@ import static org.junit.Assert.*;
public class TestStreamingCombiner extends TestStreaming {
- protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{""});
+ protected String combine = StreamUtil.makeJavaCommand(
+ UniqApp.class, new String[]{""});
public TestStreamingCombiner() throws IOException {
super();
}
protected String[] genArgs() {
- return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-reducer", reduce,
- "-combiner", combine,
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
- };
+ args.add("-combiner");
+ args.add(combine);
+ return super.genArgs();
}
@Test
- public void testCommandLine() throws Exception {
+ public void testCommandLine() throws Exception {
super.testCommandLine();
// validate combiner counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
@@ -55,10 +51,4 @@ public class TestStreamingCombiner exten
assertTrue(counters.findCounter(
counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
}
-
- public static void main(String[]args) throws Exception
- {
- new TestStreamingCombiner().testCommandLine();
- }
-
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java Mon Jul 5 05:09:53 2010
@@ -21,10 +21,8 @@ package org.apache.hadoop.streaming;
import org.junit.Test;
import static org.junit.Assert.*;
-import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
@@ -38,41 +36,18 @@ public class TestStreamingCounters exten
}
@Test
- public void testCommandLine() throws IOException
- {
- try {
- try {
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (Exception e) {
- }
-
- createInput();
- boolean mayExit = false;
-
- // During tests, the default Configuration will use a local mapred
- // So don't specify -config or -cluster
- StreamJob job = new StreamJob(genArgs(), mayExit);
- job.go();
- File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
- String output = StreamUtil.slurp(outFile);
- outFile.delete();
- assertEquals(outputExpect, output);
-
- Counters counters = job.running_.getCounters();
- assertNotNull("Counters", counters);
- Group group = counters.getGroup("UserCounters");
- assertNotNull("Group", group);
- Counter counter = group.getCounterForName("InputLines");
- assertNotNull("Counter", counter);
- assertEquals(3, counter.getCounter());
- } finally {
- try {
- INPUT_FILE.delete();
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
+ public void testCommandLine() throws Exception {
+ super.testCommandLine();
+ validateCounters();
}
+ private void validateCounters() throws IOException {
+ Counters counters = job.running_.getCounters();
+ assertNotNull("Counters", counters);
+ Group group = counters.getGroup("UserCounters");
+ assertNotNull("Group", group);
+ Counter counter = group.getCounterForName("InputLines");
+ assertNotNull("Counter", counter);
+ assertEquals(3, counter.getCounter());
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Mon Jul 5 05:09:53 2010
@@ -19,14 +19,10 @@
package org.apache.hadoop.streaming;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
+import java.io.File;
+import java.io.IOException;
/**
* This class tests if hadoopStreaming returns Exception
@@ -37,55 +33,23 @@ import org.apache.hadoop.fs.Path;
public class TestStreamingFailure extends TestStreaming
{
- protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt");
- private StreamJob job;
+ protected File INVALID_INPUT_FILE;
public TestStreamingFailure() throws IOException
{
INVALID_INPUT_FILE = new File("invalid_input.txt");
}
- protected String[] genArgs() {
- return new String[] {
- "-input", INVALID_INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-reducer", reduce,
- "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
- };
+ @Override
+ protected void setInputOutput() {
+ inputFile = INVALID_INPUT_FILE.getAbsolutePath();
+ outDir = OUTPUT_DIR.getAbsolutePath();
}
+ @Override
@Test
- public void testCommandLine()
- {
- try {
- try {
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (Exception e) {
- }
-
- boolean mayExit = false;
- int returnStatus = 0;
-
- // During tests, the default Configuration will use a local mapred
- // So don't specify -config or -cluster
- job = new StreamJob(genArgs(), mayExit);
- returnStatus = job.go();
- assertEquals("Streaming Job Failure code expected", 5, returnStatus);
- } catch(Exception e) {
- // Expecting an exception
- } finally {
- try {
- FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String[]args) throws Exception
- {
- new TestStreamingFailure().testCommandLine();
+ public void testCommandLine() throws IOException {
+ int returnStatus = runStreamJob();
+ assertEquals("Streaming Job Failure code expected", 5, returnStatus);
}
}
Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java?rev=960446&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java Mon Jul 5 05:09:53 2010
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Tests stream job with java tasks, commands in MapReduce local mode.
+ * Validates if user-set config properties
+ * {@link MRJobConfig#MAP_OUTPUT_KEY_CLASS} and
+ * {@link MRJobConfig#OUTPUT_KEY_CLASS} are honored by streaming jobs.
+ */
+public class TestStreamingOutputKeyValueTypes extends TestStreaming {
+
+ public TestStreamingOutputKeyValueTypes() throws IOException {
+ super();
+ input = "one line dummy input\n";
+ }
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ args.clear();
+ super.setUp();
+ }
+
+ @Override
+ protected String[] genArgs() {
+ // set the testcase-specific config properties first and the remaining
+ // arguments are set in TestStreaming.genArgs().
+ args.add("-jobconf");
+ args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS +
+ "=org.apache.hadoop.io.LongWritable");
+ args.add("-jobconf");
+ args.add(MRJobConfig.OUTPUT_KEY_CLASS +
+ "=org.apache.hadoop.io.LongWritable");
+
+ // Using SequenceFileOutputFormat here because with TextOutputFormat, the
+ // mapred.output.key.class set in JobConf (which we want to test here) is
+ // not read/used at all.
+ args.add("-outputformat");
+ args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat");
+
+ return super.genArgs();
+ }
+
+ @Override
+ protected void checkOutput() throws IOException {
+ // No need to validate output for the test cases in this class
+ }
+
+ public static class MyReducer<K, V>
+ extends MapReduceBase implements Reducer<K, V, LongWritable, Text> {
+
+ public void reduce(K key, Iterator<V> values,
+ OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ LongWritable l = new LongWritable();
+ while (values.hasNext()) {
+ output.collect(l, new Text(values.next().toString()));
+ }
+ }
+ }
+
+ // Check with Java Mapper, Java Reducer
+ @Test
+ public void testJavaMapperAndJavaReducer() throws Exception {
+ map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+ reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
+ super.testCommandLine();
+ }
+
+ // Check with Java Mapper, Java Reducer and -numReduceTasks 0
+ @Test
+ public void testJavaMapperAndJavaReducerAndZeroReduces() throws Exception {
+ map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+ reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
+ args.add("-numReduceTasks");
+ args.add("0");
+ super.testCommandLine();
+ }
+
+ // Check with Java Mapper, Reducer = "NONE"
+ @Test
+ public void testJavaMapperWithReduceNone() throws Exception {
+ map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+ reduce = "NONE";
+ super.testCommandLine();
+ }
+
+ // Check with Java Mapper, command Reducer
+ @Test
+ public void testJavaMapperAndCommandReducer() throws Exception {
+ map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+ reduce = "cat";
+ super.testCommandLine();
+ }
+
+ // Check with Java Mapper, command Reducer and -numReduceTasks 0
+ @Test
+ public void testJavaMapperAndCommandReducerAndZeroReduces() throws Exception {
+ map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+ reduce = "cat";
+ args.add("-numReduceTasks");
+ args.add("0");
+ super.testCommandLine();
+ }
+
+ // Check with Command Mapper, Java Reducer
+ @Test
+ public void testCommandMapperAndJavaReducer() throws Exception {
+ map = "cat";
+ reduce = MyReducer.class.getName();
+ super.testCommandLine();
+ }
+
+ // Check with Command Mapper, Java Reducer and -numReduceTasks 0
+ @Test
+ public void testCommandMapperAndJavaReducerAndZeroReduces() throws Exception {
+ map = "cat";
+ reduce = MyReducer.class.getName();
+ args.add("-numReduceTasks");
+ args.add("0");
+ super.testCommandLine();
+ }
+
+ // Check with Command Mapper, Reducer = "NONE"
+ @Test
+ public void testCommandMapperWithReduceNone() throws Exception {
+ map = "cat";
+ reduce = "NONE";
+ super.testCommandLine();
+ }
+
+ // Check with Command Mapper, Command Reducer
+ @Test
+ public void testCommandMapperAndCommandReducer() throws Exception {
+ map = "cat";
+ reduce = "cat";
+ super.testCommandLine();
+ }
+
+ // Check with Command Mapper, Command Reducer and -numReduceTasks 0
+ @Test
+ public void testCommandMapperAndCommandReducerAndZeroReduces()
+ throws Exception {
+ map = "cat";
+ reduce = "cat";
+ args.add("-numReduceTasks");
+ args.add("0");
+ super.testCommandLine();
+ }
+
+ @Override
+ @Test
+ public void testCommandLine() {
+ // Do nothing
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Jul 5 05:09:53 2010
@@ -23,8 +23,9 @@ import java.io.*;
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
- Used to test the usage of external applications without adding
- platform-specific dependencies.
+ * Used to test the usage of external applications without adding
+ * platform-specific dependencies.
+ * Use TrApp as mapper only. For reducer, use TrAppReduce.
*/
public class TrApp
{
@@ -43,8 +44,8 @@ public class TrApp
// property names have been escaped in PipeMapRed.safeEnvVarName()
expectDefined("mapreduce_cluster_local_dir");
expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
- expect("mapreduce_job_output_key_class", "org.apache.hadoop.io.Text");
- expect("mapreduce_job_output_value_class", "org.apache.hadoop.io.Text");
+ expect("mapreduce_map_output_key_class", "org.apache.hadoop.io.Text");
+ expect("mapreduce_map_output_value_class", "org.apache.hadoop.io.Text");
expect("mapreduce_task_ismap", "true");
expectDefined("mapreduce_task_attempt_id");
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java Mon Jul 5 05:09:53 2010
@@ -23,8 +23,9 @@ import java.io.*;
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
- Used to test the usage of external applications without adding
- platform-specific dependencies.
+ * Used to test the usage of external applications without adding
+ * platform-specific dependencies.
+ * Use TrAppReduce as reducer only. For mapper, use TrApp.
*/
public class TrAppReduce
{
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java Mon Jul 5 05:09:53 2010
@@ -67,9 +67,6 @@ public class TestFileOutputFormat extend
conf.setJobName("fof");
conf.setInputFormat(TextInputFormat.class);
- conf.setOutputKeyClass(LongWritable.class);
- conf.setOutputValueClass(Text.class);
-
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);