You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 06:22:22 UTC

svn commit: r725908 - in /hadoop/core/branches/branch-0.19: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Thu Dec 11 21:22:20 2008
New Revision: 725908

URL: http://svn.apache.org/viewvc?rev=725908&view=rev
Log:
Merge -r 725906:725907 and 725904:725905 from trunk onto 0.19 branch. Fixes HADOOP-4620.

Added:
    hadoop/core/branches/branch-0.19/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
      - copied unchanged from r725907, hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Dec 11 21:22:20 2008
@@ -1087,6 +1087,9 @@
     HADOOP-4795. Prevent lease monitor getting into an infinite loop when
     leases and the namespace tree does not match. (szetszwo)
 
+    HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with empty
+    input/output. (Ravi Gummadi via ddas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Dec 11 21:22:20 2008
@@ -288,6 +288,22 @@
 
   void waitOutputThreads() {
     try {
+      if (outThread_ == null) {
+        // This happens only when reducer has empty input(So reduce() is not
+        // called at all in this task). If reducer still generates output,
+        // which is very uncommon and we may not have to support this case.
+        // So we don't write this output to HDFS, but we consume/collect
+        // this output just to avoid reducer hanging forever.
+
+        OutputCollector collector = new OutputCollector() {
+          public void collect(Object key, Object value)
+            throws IOException {
+            //just consume it, no need to write the record anywhere
+          }
+        };
+        Reporter reporter = Reporter.NULL;//dummy reporter
+        startOutputThreads(collector, reporter);
+      }
       int exitVal = sim.waitFor();
       // how'd it go?
       if (exitVal != 0) {
@@ -506,9 +522,11 @@
   }
 
   public void mapRedFinished() {
-    logprintln("mapRedFinished");
     try {
-      if (!doPipe_) return;
+      if (!doPipe_) {
+        logprintln("mapRedFinished");
+        return;
+      }
       try {
         if (clientOut_ != null) {
           clientOut_.flush();
@@ -518,6 +536,7 @@
       }
       waitOutputThreads();
       if (sim != null) sim.destroy();
+      logprintln("mapRedFinished");
     } catch (RuntimeException e) {
       logStackTrace(e);
       throw e;

Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu Dec 11 21:22:20 2008
@@ -82,10 +82,6 @@
   // (MapRed creates it reflectively)
 
   public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
-    // init
-    if (outThread_ == null) {
-      startOutputThreads(output, reporter);
-    }
     if (outerrThreadsThrowable != null) {
       mapRedFinished();
       throw new IOException ("MROutput/MRErrThread failed:"

Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu Dec 11 21:22:20 2008
@@ -730,6 +730,7 @@
         jobConf_.setMapperClass(c);
       } else {
         jobConf_.setMapperClass(PipeMapper.class);
+        jobConf_.setMapRunnerClass(PipeMapRunner.class);
         jobConf_.set("stream.map.streamprocessor", 
                      URLEncoder.encode(mapCmd_, "UTF-8"));
       }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java Thu Dec 11 21:22:20 2008
@@ -58,4 +58,7 @@
     }
   }
 
+  protected Mapper<K1, V1, K2, V2> getMapper() {
+    return mapper;
+  }
 }