You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 06:22:22 UTC
svn commit: r725908 - in /hadoop/core/branches/branch-0.19: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/mapred/org/apache/hadoop/mapred/
Author: ddas
Date: Thu Dec 11 21:22:20 2008
New Revision: 725908
URL: http://svn.apache.org/viewvc?rev=725908&view=rev
Log:
Merge -r 725906:725907 and 725904:725905 from trunk onto 0.19 branch. Fixes HADOOP-4620.
Added:
hadoop/core/branches/branch-0.19/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
- copied unchanged from r725907, hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Dec 11 21:22:20 2008
@@ -1087,6 +1087,9 @@
HADOOP-4795. Prevent lease monitor getting into an infinite loop when
leases and the namespace tree does not match. (szetszwo)
+ HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with empty
+ input/output. (Ravi Gummadi via ddas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Dec 11 21:22:20 2008
@@ -288,6 +288,22 @@
void waitOutputThreads() {
try {
+ if (outThread_ == null) {
+ // This happens only when reducer has empty input(So reduce() is not
+ // called at all in this task). If reducer still generates output,
+ // which is very uncommon and we may not have to support this case.
+ // So we don't write this output to HDFS, but we consume/collect
+ // this output just to avoid reducer hanging forever.
+
+ OutputCollector collector = new OutputCollector() {
+ public void collect(Object key, Object value)
+ throws IOException {
+ //just consume it, no need to write the record anywhere
+ }
+ };
+ Reporter reporter = Reporter.NULL;//dummy reporter
+ startOutputThreads(collector, reporter);
+ }
int exitVal = sim.waitFor();
// how'd it go?
if (exitVal != 0) {
@@ -506,9 +522,11 @@
}
public void mapRedFinished() {
- logprintln("mapRedFinished");
try {
- if (!doPipe_) return;
+ if (!doPipe_) {
+ logprintln("mapRedFinished");
+ return;
+ }
try {
if (clientOut_ != null) {
clientOut_.flush();
@@ -518,6 +536,7 @@
}
waitOutputThreads();
if (sim != null) sim.destroy();
+ logprintln("mapRedFinished");
} catch (RuntimeException e) {
logStackTrace(e);
throw e;
Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu Dec 11 21:22:20 2008
@@ -82,10 +82,6 @@
// (MapRed creates it reflectively)
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
- // init
- if (outThread_ == null) {
- startOutputThreads(output, reporter);
- }
if (outerrThreadsThrowable != null) {
mapRedFinished();
throw new IOException ("MROutput/MRErrThread failed:"
Modified: hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu Dec 11 21:22:20 2008
@@ -730,6 +730,7 @@
jobConf_.setMapperClass(c);
} else {
jobConf_.setMapperClass(PipeMapper.class);
+ jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
URLEncoder.encode(mapCmd_, "UTF-8"));
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=725908&r1=725907&r2=725908&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapRunner.java Thu Dec 11 21:22:20 2008
@@ -58,4 +58,7 @@
}
}
+ protected Mapper<K1, V1, K2, V2> getMapper() {
+ return mapper;
+ }
}