You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/23 09:22:04 UTC

svn commit: r746918 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/lib/ src/mapred/org/apache/hadoop/mapred/pipes/ src/mapred/org/apache/hadoop/mapreduce/lib/output/ src/test/...

Author: ddas
Date: Mon Feb 23 08:22:04 2009
New Revision: 746918

URL: http://svn.apache.org/viewvc?rev=746918&view=rev
Log:
HADOOP-4927. Adds a generic wrapper around outputformat to allow creation of output on demand. Contributed by Jothi Padmanabhan.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/FilterOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LazyOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLazyOutput.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746918&r1=746917&r2=746918&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Feb 23 08:22:04 2009
@@ -46,6 +46,9 @@
     HADOOP-5052. Add an example computing exact digits of pi using the
     Bailey-Borwein-Plouffe algorithm. (Tsz Wo (Nicholas), SZE via cdouglas)
 
+    HADOOP-4927. Adds a generic wrapper around outputformat to allow creation of
+    output on demand (Jothi Padmanabhan via ddas)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=746918&r1=746917&r2=746918&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Feb 23 08:22:04 2009
@@ -60,11 +60,13 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.LazyOutputFormat;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
 import org.apache.hadoop.streaming.io.IdentifierResolver;
@@ -271,6 +273,8 @@
       comCmd_ = (String)cmdLine.getValue("-combiner"); 
       redCmd_ = (String)cmdLine.getValue("-reducer"); 
       
+      lazyOutput_ = cmdLine.hasOption("-lazyOutput");
+      
       if(!cmdLine.getValues("-file").isEmpty()) {
         packageFiles_.addAll(cmdLine.getValues("-file"));
       }
@@ -468,6 +472,7 @@
     Option help = createBoolOption("help", "print this help message"); 
     Option debug = createBoolOption("debug", "print debug output"); 
     Option inputtagged = createBoolOption("inputtagged", "inputtagged"); 
+    Option lazyOutput = createBoolOption("lazyOutput", "create outputs lazily");
     
     allOptions = new GroupBuilder().
       withOption(input).
@@ -496,6 +501,7 @@
       withOption(debug).
       withOption(inputtagged).
       withOption(help).
+      withOption(lazyOutput).
       create();
     parser.setGroup(allOptions);
     
@@ -525,6 +531,7 @@
     System.out.println("  -reducedebug <path>  Optional." +
     " To run this script when a reduce task fails ");
     System.out.println("  -io <identifier>  Optional.");
+    System.out.println("  -lazyOutput Optional. Lazily create Output");
     System.out.println("  -verbose");
     System.out.println();
     GenericOptionsParser.printGenericCommandUsage(System.out);
@@ -852,7 +859,11 @@
     if (fmt == null) {
       fmt = TextOutputFormat.class;
     }
-    jobConf_.setOutputFormat(fmt);
+    if (lazyOutput_) {
+      LazyOutputFormat.setOutputFormatClass(jobConf_, fmt);
+    } else {
+      jobConf_.setOutputFormat(fmt);
+    }
 
     if (partitionerSpec_!= null) {
       c = StreamUtil.goodClassOrNull(jobConf_, partitionerSpec_, defaultPackage);
@@ -1100,6 +1111,7 @@
   protected String mapDebugSpec_;
   protected String reduceDebugSpec_;
   protected String ioSpec_;
+  protected boolean lazyOutput_;
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/FilterOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/FilterOutputFormat.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/FilterOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/FilterOutputFormat.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * FilterOutputFormat is a convenience class that wraps OutputFormat. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.output.FilterOutputFormat} instead.
+ */
+@Deprecated
+public class FilterOutputFormat<K, V> implements OutputFormat<K, V> {
+
+  protected OutputFormat<K,V> baseOut;
+
+  public FilterOutputFormat () {
+    this.baseOut = null;
+  }
+
+  /**
+   * Create a FilterOutputFormat based on the supplied output format.
+   * @param out the underlying OutputFormat
+   */
+  public FilterOutputFormat (OutputFormat<K,V> out) {
+    this.baseOut = out;
+  }
+
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
+      String name, Progressable progress) throws IOException {
+    return getBaseOut().getRecordWriter(ignored, job, name, progress);
+  }
+
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) 
+  throws IOException {
+    getBaseOut().checkOutputSpecs(ignored, job);
+  }
+  
+  private OutputFormat<K,V> getBaseOut() throws IOException {
+    if (baseOut == null) {
+      throw new IOException("Outputformat not set for FilterOutputFormat");
+    }
+    return baseOut;
+  }
+
+  /**
+   * <code>FilterRecordWriter</code> is a convenience wrapper
+   * class that implements  {@link RecordWriter}.
+   */
+
+  public static class FilterRecordWriter<K,V> implements RecordWriter<K,V> {
+
+    protected RecordWriter<K,V> rawWriter = null;
+
+    public FilterRecordWriter() throws IOException {
+      rawWriter = null;
+    }
+
+    public FilterRecordWriter(RecordWriter<K,V> rawWriter)  throws IOException {
+      this.rawWriter = rawWriter;
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      getRawWriter().close(reporter);
+    }
+
+    public void write(K key, V value) throws IOException {
+      getRawWriter().write(key, value);
+    }
+    
+    private RecordWriter<K,V> getRawWriter() throws IOException {
+      if (rawWriter == null) {
+        throw new IOException ("Record Writer not set for FilterRecordWriter");
+      }
+      return rawWriter;
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LazyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LazyOutputFormat.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LazyOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LazyOutputFormat.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A Convenience class that creates output lazily. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat} instead.
+ */
+@Deprecated
+public class LazyOutputFormat<K, V> extends FilterOutputFormat<K, V> {
+  /**
+   * Set the underlying output format for LazyOutputFormat.
+   * @param job the {@link JobConf} to modify
+   * @param theClass the underlying class
+   */
+  @SuppressWarnings("unchecked")
+  public static void  setOutputFormatClass(JobConf job, 
+      Class<? extends OutputFormat> theClass) {
+      job.setOutputFormat(LazyOutputFormat.class);
+      job.setClass("mapred.lazy.output.format", theClass, OutputFormat.class);
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
+      String name, Progressable progress) throws IOException {
+    if (baseOut == null) {
+      getBaseOutputFormat(job);
+    }
+    return new LazyRecordWriter<K, V>(job, baseOut, name, progress);
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) 
+  throws IOException {
+    if (baseOut == null) {
+      getBaseOutputFormat(job);
+    }
+    super.checkOutputSpecs(ignored, job);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void getBaseOutputFormat(JobConf job) throws IOException {
+    baseOut = ReflectionUtils.newInstance(
+        job.getClass("mapred.lazy.output.format", null, OutputFormat.class), 
+        job); 
+    if (baseOut == null) {
+      throw new IOException("Ouput format not set for LazyOutputFormat");
+    }
+  }
+  
+  /**
+   * <code>LazyRecordWriter</code> is a convenience 
+   * class that works with LazyOutputFormat.
+   */
+
+  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
+
+    final OutputFormat of;
+    final String name;
+    final Progressable progress;
+    final JobConf job;
+
+    public LazyRecordWriter(JobConf job, OutputFormat of, String name,
+        Progressable progress)  throws IOException {
+      this.of = of;
+      this.job = job;
+      this.name = name;
+      this.progress = progress;
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      if (rawWriter != null) {
+        rawWriter.close(reporter);
+      }
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException {
+      if (rawWriter == null) {
+        createRecordWriter();
+      }
+      super.write(key, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void createRecordWriter() throws IOException {
+      FileSystem fs = FileSystem.get(job);
+      rawWriter = of.getRecordWriter(fs, job, name, progress);
+    }  
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java?rev=746918&r1=746917&r2=746918&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java Mon Feb 23 08:22:04 2009
@@ -52,6 +52,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.LazyOutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -361,6 +362,7 @@
       System.out.println("  [-writer <class>] // Java RecordWriter");
       System.out.println("  [-program <executable>] // executable URI");
       System.out.println("  [-reduces <num>] // number of reduces");
+      System.out.println("  [-lazyOutput] // createOutputLazily");
       System.out.println();
       GenericOptionsParser.printGenericCommandUsage(System.out);
     }
@@ -398,6 +400,8 @@
     cli.addOption("jobconf", false, 
         "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.",
         "key=val");
+    cli.addOption("lazyOutput", false, "Optional. Create output lazily",
+                  "boolean");
     Parser parser = cli.createParser();
     try {
       
@@ -446,6 +450,14 @@
         job.setOutputFormat(getClass(results, "-writer", job, 
                                       OutputFormat.class));
       }
+      
+      if (results.hasOption("-lazyOutput")) {
+        if (Boolean.parseBoolean((String)results.getValue("-lazyOutput"))) {
+          LazyOutputFormat.setOutputFormatClass(job,
+              job.getOutputFormat().getClass());
+        }
+      }
+      
       if (results.hasOption("-program")) {
         setExecutable(job, (String) results.getValue("-program"));
       }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FilterOutputFormat is a convenience class that wraps OutputFormat. 
+ */
+public class FilterOutputFormat <K,V> extends OutputFormat<K, V> {
+
+  protected OutputFormat<K,V> baseOut;
+
+  public FilterOutputFormat() {
+    this.baseOut = null;
+  }
+  
+  /**
+   * Create a FilterOutputFormat based on the underlying output format.
+   * @param baseOut the underlying OutputFormat
+   */
+  public FilterOutputFormat(OutputFormat<K,V> baseOut) {
+    this.baseOut = baseOut;
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    return getBaseOut().getRecordWriter(context);
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) 
+  throws IOException, InterruptedException {
+    getBaseOut().checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    return getBaseOut().getOutputCommitter(context);
+  }
+
+  private OutputFormat<K,V> getBaseOut() throws IOException {
+    if (baseOut == null) {
+      throw new IOException("OutputFormat not set for FilterOutputFormat");
+    }
+    return baseOut;
+  }
+  /**
+   * <code>FilterRecordWriter</code> is a convenience wrapper
+   * class that extends the {@link RecordWriter}.
+   */
+
+  public static class FilterRecordWriter<K,V> extends RecordWriter<K,V> {
+
+    protected RecordWriter<K,V> rawWriter = null;
+
+    public FilterRecordWriter() {
+      rawWriter = null;
+    }
+    
+    public FilterRecordWriter(RecordWriter<K,V> rwriter) {
+      this.rawWriter = rwriter;
+    }
+    
+    @Override
+    public void write(K key, V value) throws IOException, InterruptedException {
+      getRawWriter().write(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException, InterruptedException {
+      getRawWriter().close(context);
+    }
+    
+    private RecordWriter<K,V> getRawWriter() throws IOException {
+      if (rawWriter == null) {
+        throw new IOException("Record Writer not set for FilterRecordWriter");
+      }
+      return rawWriter;
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A Convenience class that creates output lazily.  
+ */
+public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
+  /**
+   * Set the underlying output format for LazyOutputFormat.
+   * @param job the {@link Job} to modify
+   * @param theClass the underlying class
+   */
+  @SuppressWarnings("unchecked")
+  public static void  setOutputFormatClass(Job job, 
+                                     Class<? extends OutputFormat> theClass) {
+      job.setOutputFormatClass(LazyOutputFormat.class);
+      job.getConfiguration().setClass("mapred.lazy.output.format", 
+          theClass, OutputFormat.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void getBaseOutputFormat(Configuration conf) 
+  throws IOException {
+    baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
+        conf.getClass("mapred.lazy.output.format", null), conf));
+    if (baseOut == null) {
+      throw new IOException("Output Format not set for LazyOutputFormat");
+    }
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+    return new LazyRecordWriter<K, V>(baseOut, context);
+  }
+  
+  @Override
+  public void checkOutputSpecs(JobContext context) 
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+   super.checkOutputSpecs(context);
+  }
+  
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+    return super.getOutputCommitter(context);
+  }
+  
+  /**
+   * A convenience class to be used with LazyOutputFormat
+   */
+  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
+
+    final OutputFormat<K,V> outputFormat;
+    final TaskAttemptContext taskContext;
+
+    public LazyRecordWriter(OutputFormat<K,V> out, 
+                            TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+      this.outputFormat = out;
+      this.taskContext = taskContext;
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException, InterruptedException {
+      if (rawWriter == null) {
+        rawWriter = outputFormat.getRecordWriter(taskContext);
+      }
+      rawWriter.write(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException, InterruptedException {
+      if (rawWriter != null) {
+        rawWriter.close(context);
+      }
+    }
+
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLazyOutput.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLazyOutput.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLazyOutput.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.lib.LazyOutputFormat;
+import junit.framework.TestCase;
+
+/**
+ * A JUnit test to test the Map-Reduce framework's feature to create part
+ * files only if there is an explicit output.collect. This helps in preventing
+ * 0 byte files
+ */
+public class TestLazyOutput extends TestCase {
+  private static final int NUM_HADOOP_SLAVES = 3;
+  private static final int NUM_MAPS_PER_NODE = 2;
+  private static final Path INPUT = new Path("/testlazy/input");
+
+  private static final List<String> input = 
+    Arrays.asList("All","Roads","Lead","To","Hadoop");
+
+
+  static class TestMapper extends MapReduceBase
+  implements Mapper<LongWritable, Text, LongWritable, Text> {
+    private String id;
+
+    public void configure(JobConf job) {
+      id = job.get("mapred.task.id");
+    }
+
+    public void map(LongWritable key, Text val,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+    throws IOException {
+      // Everybody other than id 0 outputs
+      if (!id.endsWith("0_0")) {
+        output.collect(key, val);
+      }
+    }
+  }
+
+  static class TestReducer  extends MapReduceBase 
+  implements Reducer<LongWritable, Text, LongWritable, Text> {
+    private String id;
+
+    public void configure(JobConf job) {
+      id = job.get("mapred.task.id");
+    }
+
+    /** Writes all keys and values directly to output. */
+    public void reduce(LongWritable key, Iterator<Text> values,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+    throws IOException {
+      while (values.hasNext()) {
+        Text v = values.next();
+        //Reducer 0 skips collect
+        if (!id.endsWith("0_0")) {
+          output.collect(key, v);
+        }
+      }
+    }
+  }
+
+  private static void runTestLazyOutput(JobConf job, Path output,
+      int numReducers, boolean createLazily) 
+  throws Exception {
+
+    job.setJobName("test-lazy-output");
+
+    FileInputFormat.setInputPaths(job, INPUT);
+    FileOutputFormat.setOutputPath(job, output);
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setMapperClass(TestMapper.class);        
+    job.setReducerClass(TestReducer.class);
+
+    JobClient client = new JobClient(job);
+    job.setNumReduceTasks(numReducers);
+    if (createLazily) {
+      LazyOutputFormat.setOutputFormatClass
+        (job, TextOutputFormat.class);
+    } else {
+      job.setOutputFormat(TextOutputFormat.class);
+    }
+
+    JobClient.runJob(job);
+  }
+
+  public void createInput(FileSystem fs, int numMappers) throws Exception {
+    for (int i =0; i < numMappers; i++) {
+      OutputStream os = fs.create(new Path(INPUT, 
+        "text" + i + ".txt"));
+      Writer wr = new OutputStreamWriter(os);
+      for(String inp : input) {
+        wr.write(inp+"\n");
+      }
+      wr.close();
+    }
+  }
+
+
+  public void testLazyOutput() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+
+      // Start the mini-MR and mini-DFS clusters
+      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
+
+      int numReducers = 2;
+      int numMappers = NUM_HADOOP_SLAVES * NUM_MAPS_PER_NODE;
+
+      createInput(fileSys, numMappers);
+      Path output1 = new Path("/testlazy/output1");
+
+      // Test 1. 
+      runTestLazyOutput(mr.createJobConf(), output1, 
+          numReducers, true);
+
+      Path[] fileList = 
+        FileUtil.stat2Paths(fileSys.listStatus(output1,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+      assertTrue(fileList.length == (numReducers - 1));
+
+      // Test 2. 0 Reducers, maps directly write to the output files
+      Path output2 = new Path("/testlazy/output2");
+      runTestLazyOutput(mr.createJobConf(), output2, 0, true);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output2,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers - 1);
+
+      // Test 3. 0 Reducers, but flag is turned off
+      Path output3 = new Path("/testlazy/output3");
+      runTestLazyOutput(mr.createJobConf(), output3, 0, false);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output3,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers);
+
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java?rev=746918&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java Mon Feb 23 08:22:04 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * A JUnit test to test the Map-Reduce framework's feature to create part
+ * files only if there is an explicit output.collect. This helps in preventing
+ * 0 byte files
+ */
+public class TestMapReduceLazyOutput extends TestCase {
+  private static final int NUM_HADOOP_SLAVES = 3;
+  private static final int NUM_MAPS_PER_NODE = 2;
+  private static final Path INPUT = new Path("/testlazy/input");
+
+  private static final List<String> input = 
+    Arrays.asList("All","Roads","Lead","To","Hadoop");
+
+  public static class TestMapper 
+  extends Mapper<LongWritable, Text, LongWritable, Text>{
+
+    public void map(LongWritable key, Text value, Context context
+    ) throws IOException, InterruptedException {
+      String id = context.getTaskAttemptID().toString();
+      // Mapper 0 does not output anything
+      if (!id.endsWith("0_0")) {
+        context.write(key, value);
+      }
+    }
+  }
+
+
+  public static class TestReducer 
+  extends Reducer<LongWritable,Text,LongWritable,Text> {
+    
+    public void reduce(LongWritable key, Iterable<Text> values, 
+        Context context) throws IOException, InterruptedException {
+      String id = context.getTaskAttemptID().toString();
+      // Reducer 0 does not output anything
+      if (!id.endsWith("0_0")) {
+        for (Text val: values) {
+          context.write(key, val);
+        }
+      }
+    }
+  }
+  
+  private static void runTestLazyOutput(Configuration conf, Path output,
+      int numReducers, boolean createLazily) 
+  throws Exception {
+    Job job = new Job(conf, "Test-Lazy-Output");
+
+    FileInputFormat.setInputPaths(job, INPUT);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(TestMapReduceLazyOutput.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(numReducers);
+
+    job.setMapperClass(TestMapper.class);
+    job.setReducerClass(TestReducer.class);
+
+    if (createLazily) {
+      LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+    } else {
+      job.setOutputFormatClass(TextOutputFormat.class);
+    }
+    assertTrue(job.waitForCompletion());
+  }
+
+  public void createInput(FileSystem fs, int numMappers) throws Exception {
+    for (int i =0; i < numMappers; i++) {
+      OutputStream os = fs.create(new Path(INPUT, 
+        "text" + i + ".txt"));
+      Writer wr = new OutputStreamWriter(os);
+      for(String inp : input) {
+        wr.write(inp+"\n");
+      }
+      wr.close();
+    }
+  }
+
+
+  public void testLazyOutput() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+
+      // Start the mini-MR and mini-DFS clusters
+      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
+
+      int numReducers = 2;
+      int numMappers = NUM_HADOOP_SLAVES * NUM_MAPS_PER_NODE;
+
+      createInput(fileSys, numMappers);
+      Path output1 = new Path("/testlazy/output1");
+
+      // Test 1. 
+      runTestLazyOutput(mr.createJobConf(), output1, 
+          numReducers, true);
+
+      Path[] fileList = 
+        FileUtil.stat2Paths(fileSys.listStatus(output1,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+      assertTrue(fileList.length == (numReducers - 1));
+
+      // Test 2. 0 Reducers, maps directly write to the output files
+      Path output2 = new Path("/testlazy/output2");
+      runTestLazyOutput(mr.createJobConf(), output2, 0, true);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output2,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers - 1);
+
+      // Test 3. 0 Reducers, but flag is turned off
+      Path output3 = new Path("/testlazy/output3");
+      runTestLazyOutput(mr.createJobConf(), output3, 0, false);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output3,
+            new OutputLogFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers);
+
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+
+}