You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2007/09/23 04:30:47 UTC

svn commit: r578543 - in /lucene/hadoop/trunk: CHANGES.txt src/c++/pipes/impl/HadoopPipes.cc src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Author: ddas
Date: Sat Sep 22 19:30:46 2007
New Revision: 578543

URL: http://svn.apache.org/viewvc?rev=578543&view=rev
Log:
HADOOP-1573. Support for 0 reducers in PIPES.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=578543&r1=578542&r2=578543&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Sat Sep 22 19:30:46 2007
@@ -87,6 +87,9 @@
 
   BUG FIXES
 
+    HADOOP-1573. Support for 0 reducers in PIPES. 
+    (Owen O'Malley via devaraj)
+
     HADOOP-1500. Fix typographical errors in the DFS WebUI.
     (Nigel Daley via dhruba)
 

Modified: lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=578543&r1=578542&r2=578543&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc Sat Sep 22 19:30:46 2007
@@ -628,9 +628,11 @@
         value = new string();
       }
       mapper = factory->createMapper(*this);
-      reducer = factory->createCombiner(*this);
-      partitioner = factory->createPartitioner(*this);
       numReduces = _numReduces;
+      if (numReduces != 0) { 
+        reducer = factory->createCombiner(*this);
+        partitioner = factory->createPartitioner(*this);
+      }
       if (reducer != NULL) {
         int64_t spillSize = 100;
         if (jobConf->hasKey("io.sort.mb")) {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=578543&r1=578542&r2=578543&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Sat Sep 22 19:30:46 2007
@@ -61,11 +61,15 @@
       mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
       writeInputFile(fs, inputPath);
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
-                 inputPath, outputPath, twoSplitOutput);
+                 inputPath, outputPath, 3, 2, twoSplitOutput);
+      FileUtil.fullyDelete(fs, outputPath);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+                 inputPath, outputPath, 3, 0, noSortOutput);
       FileUtil.fullyDelete(fs, outputPath);
       assertFalse("output not cleaned up", fs.exists(outputPath));
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
-                 inputPath, outputPath, fixedPartitionOutput);
+                 inputPath, outputPath, 3, 2, fixedPartitionOutput);
       runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
       mr.waitUntilIdle();
     } finally {
@@ -84,6 +88,20 @@
     "into\t1\nis\t1\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\ntired\t1\n" +
     "twice\t1\nvery\t1\nwhat\t1\n"
   };
+
+  final static String[] noSortOutput = new String[] {
+    "it,\t1\n`and\t1\nwhat\t1\nis\t1\nthe\t1\nuse\t1\nof\t1\na\t1\n" +
+    "book,'\t1\nthought\t1\nAlice\t1\n`without\t1\npictures\t1\nor\t1\n"+
+    "conversation?'\t1\n",
+
+    "Alice\t1\nwas\t1\nbeginning\t1\nto\t1\nget\t1\nvery\t1\ntired\t1\n"+
+    "of\t1\nsitting\t1\nby\t1\nher\t1\nsister\t1\non\t1\nthe\t1\nbank,\t1\n"+
+    "and\t1\nof\t1\nhaving\t1\nnothing\t1\nto\t1\ndo:\t1\nonce\t1\n", 
+
+    "or\t1\ntwice\t1\nshe\t1\nhad\t1\npeeped\t1\ninto\t1\nthe\t1\nbook\t1\n"+
+    "her\t1\nsister\t1\nwas\t1\nreading,\t1\nbut\t1\nit\t1\nhad\t1\nno\t1\n"+
+    "pictures\t1\nor\t1\nconversations\t1\nin\t1\n"
+  };
   
   final static String[] fixedPartitionOutput = new String[] {
     "Alice\t2\n`and\t1\n`without\t1\na\t1\nand\t1\nbank,\t1\nbeginning\t1\n" +
@@ -110,14 +128,14 @@
 
   private void runProgram(MiniMRCluster mr, FileSystem fs, 
                           Path program, Path inputPath, Path outputPath,
-                          String[] expectedResults
+                          int numMaps, int numReduces, String[] expectedResults
                          ) throws IOException {
     Path wordExec = new Path("/testing/bin/application");
     FileUtil.fullyDelete(fs, wordExec.getParent());
     fs.copyFromLocalFile(program, wordExec);                                         
     JobConf job = mr.createJobConf();
-    job.setNumMapTasks(3);
-    job.setNumReduceTasks(expectedResults.length);
+    job.setNumMapTasks(numMaps);
+    job.setNumReduceTasks(numReduces);
     Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
     Submitter.setIsJavaRecordReader(job, true);
     Submitter.setIsJavaRecordWriter(job, true);