You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 06:32:08 UTC

svn commit: r725912 - in /hadoop/core/branches/branch-0.18: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Thu Dec 11 21:32:06 2008
New Revision: 725912

URL: http://svn.apache.org/viewvc?rev=725912&view=rev
Log:
Merge -r  725904:725905 725906:725907 725909:725910 from trunk onto 0.18 branch. Fixes HADOOP-4620.

Added:
    hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
      - copied unchanged from r725910, hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java
    hadoop/core/branches/branch-0.18/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
      - copied unchanged from r725907, hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapRunner.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=725912&r1=725911&r2=725912&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Thu Dec 11 21:32:06 2008
@@ -99,6 +99,9 @@
 
     HADOOP-4824. Should not use File.setWritable() in 0.18. (hairong)
 
+    HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with empty
+    input/output. (Ravi Gummadi via ddas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=725912&r1=725911&r2=725912&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Dec 11 21:32:06 2008
@@ -294,6 +294,22 @@
 
   void waitOutputThreads() {
     try {
+      if (outThread_ == null) {
+        // This happens only when reducer has empty input(So reduce() is not
+        // called at all in this task). If reducer still generates output,
+        // which is very uncommon and we may not have to support this case.
+        // So we don't write this output to HDFS, but we consume/collect
+        // this output just to avoid reducer hanging forever.
+
+        OutputCollector collector = new OutputCollector() {
+          public void collect(Object key, Object value)
+            throws IOException {
+            //just consume it, no need to write the record anywhere
+          }
+        };
+        Reporter reporter = Reporter.NULL;//dummy reporter
+        startOutputThreads(collector, reporter);
+      }
       int exitVal = sim.waitFor();
       // how'd it go?
       if (exitVal != 0) {
@@ -505,9 +521,11 @@
   }
 
   public void mapRedFinished() {
-    logprintln("mapRedFinished");
     try {
-      if (!doPipe_) return;
+      if (!doPipe_) {
+        logprintln("mapRedFinished");
+        return;
+      }
       try {
         if (clientOut_ != null) {
           clientOut_.flush();
@@ -517,6 +535,7 @@
       }
       waitOutputThreads();
       if (sim != null) sim.destroy();
+      logprintln("mapRedFinished");
     } catch (RuntimeException e) {
       logStackTrace(e);
       throw e;

Modified: hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=725912&r1=725911&r2=725912&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu Dec 11 21:32:06 2008
@@ -63,10 +63,6 @@
   // (MapRed creates it reflectively)
 
   public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
-    // init
-    if (outThread_ == null) {
-      startOutputThreads(output, reporter);
-    }
     if (outerrThreadsThrowable != null) {
       mapRedFinished();
       throw new IOException ("MROutput/MRErrThread failed:"

Modified: hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=725912&r1=725911&r2=725912&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu Dec 11 21:32:06 2008
@@ -751,6 +751,7 @@
         jobConf_.setMapperClass(c);
       } else {
         jobConf_.setMapperClass(PipeMapper.class);
+        jobConf_.setMapRunnerClass(PipeMapRunner.class);
         jobConf_.set("stream.map.streamprocessor", 
                      URLEncoder.encode(mapCmd_, "UTF-8"));
       }

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=725912&r1=725911&r2=725912&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapRunner.java Thu Dec 11 21:32:06 2008
@@ -51,4 +51,7 @@
     }
   }
 
+  protected Mapper<K1, V1, K2, V2> getMapper() {
+    return mapper;
+  }
 }