You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/23 19:16:59 UTC

svn commit: r729052 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapreduce/lib/input/ src/mapred/org/apache/hadoop/mapreduce/lib/map/ src/mapred/org/apache/hadoop/mapreduce/lib/output/

Author: omalley
Date: Tue Dec 23 10:16:59 2008
New Revision: 729052

URL: http://svn.apache.org/viewvc?rev=729052&view=rev
Log:
HADOOP-4909. Fix Javadoc and make some of the API more consistent in their
use of the JobContext instead of Configuration. (omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec 23 10:16:59 2008
@@ -301,14 +301,18 @@
 
     HADOOP-4753. Refactor gridmix2 to reduce code duplication. (cdouglas)
 
+    HADOOP-4909. Fix Javadoc and make some of the API more consistent in their
+    use of the JobContext instead of Configuration. (omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
     based on the rack/host that has the most number of bytes.
     (Jothi Padmanabhan via ddas)
 
-    HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents
-    in a separate thread. (Jothi Padmanabhan via ddas)
+    HADOOP-4683. Fixes Reduce shuffle scheduler to invoke
+    getMapCompletionEvents in a separate thread. (Jothi Padmanabhan
+    via ddas)
 
   BUG FIXES
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Dec 23 10:16:59 2008
@@ -50,7 +50,7 @@
  */
 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
 
-  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
@@ -128,8 +128,13 @@
     job.getConfiguration().setLong("mapred.min.split.size", size);
   }
 
-  public static long getMinSplitSize(Configuration conf) {
-    return conf.getLong("mapred.min.split.size", 1L);
+  /**
+   * Get the minimum split size
+   * @param job the job
+   * @return the minimum number of bytes that can be in a split
+   */
+  public static long getMinSplitSize(JobContext job) {
+    return job.getConfiguration().getLong("mapred.min.split.size", 1L);
   }
 
   /**
@@ -142,8 +147,14 @@
     job.getConfiguration().setLong("mapred.max.split.size", size);
   }
 
-  public static long getMaxSplitSize(Configuration conf) {
-    return conf.getLong("mapred.max.split.size", Long.MAX_VALUE);
+  /**
+   * Get the maximum split size.
+   * @param context the job to look at.
+   * @return the maximum number of bytes a split can include
+   */
+  public static long getMaxSplitSize(JobContext context) {
+    return context.getConfiguration().getLong("mapred.max.split.size", 
+                                              Long.MAX_VALUE);
   }
 
   /**
@@ -151,7 +162,8 @@
    *
    * @return the PathFilter instance set for the job, NULL if none has been set.
    */
-  public static PathFilter getInputPathFilter(Configuration conf) {
+  public static PathFilter getInputPathFilter(JobContext context) {
+    Configuration conf = context.getConfiguration();
     Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
         PathFilter.class);
     return (filterClass != null) ?
@@ -166,8 +178,8 @@
    * @return array of FileStatus objects
    * @throws IOException if zero items.
    */
-  protected List<FileStatus> listStatus(Configuration job
-                                              ) throws IOException {
+  protected List<FileStatus> listStatus(JobContext job
+                                        ) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
     Path[] dirs = getInputPaths(job);
     if (dirs.length == 0) {
@@ -188,7 +200,7 @@
     
     for (int i=0; i < dirs.length; ++i) {
       Path p = dirs[i];
-      FileSystem fs = p.getFileSystem(job); 
+      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
       FileStatus[] matches = fs.globStatus(p, inputFilter);
       if (matches == null) {
         errors.add(new IOException("Input path does not exist: " + p));
@@ -216,11 +228,11 @@
   }
   
 
-  /** Splits files returned by {@link #listStatus(Configuration)} when
-   * they're too big.*/ 
-  public List<InputSplit> getSplits(JobContext context
+  /** 
+   * Generate the list of files and make them into FileSplits.
+   */ 
+  public List<InputSplit> getSplits(JobContext job
                                     ) throws IOException {
-    Configuration job = context.getConfiguration();    
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
 
@@ -228,10 +240,10 @@
     List<InputSplit> splits = new ArrayList<InputSplit>();
     for (FileStatus file: listStatus(job)) {
       Path path = file.getPath();
-      FileSystem fs = path.getFileSystem(context.getConfiguration());
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
       long length = file.getLen();
       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-      if ((length != 0) && isSplitable(context, path)) { 
+      if ((length != 0) && isSplitable(job, path)) { 
         long blockSize = file.getBlockSize();
         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 
@@ -391,11 +403,11 @@
   /**
    * Get the list of input {@link Path}s for the map-reduce job.
    * 
-   * @param conf The configuration of the job 
+   * @param context The job
    * @return the list of input {@link Path}s for the map-reduce job.
    */
-  public static Path[] getInputPaths(Configuration conf) {
-    String dirs = conf.get("mapred.input.dir", "");
+  public static Path[] getInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get("mapred.input.dir", "");
     String [] list = StringUtils.split(dirs);
     Path[] result = new Path[list.length];
     for (int i = 0; i < list.length; i++) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java Tue Dec 23 10:16:59 2008
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +29,7 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -43,12 +43,14 @@
     return new SequenceFileRecordReader<K,V>();
   }
 
+  @Override
   protected long getFormatMinSplitSize() {
     return SequenceFile.SYNC_INTERVAL;
   }
 
-  protected List<FileStatus> listStatus(Configuration job
-                                              )throws IOException {
+  @Override
+  protected List<FileStatus> listStatus(JobContext job
+                                        )throws IOException {
 
     List<FileStatus> files = super.listStatus(job);
     int len = files.size();
@@ -56,7 +58,7 @@
       FileStatus file = files.get(i);
       if (file.isDir()) {     // it's a MapFile
         Path p = file.getPath();
-        FileSystem fs = p.getFileSystem(job);
+        FileSystem fs = p.getFileSystem(job.getConfiguration());
         // use the data file
         files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java Tue Dec 23 10:16:59 2008
@@ -41,6 +41,7 @@
     return new LineRecordReader();
   }
 
+  @Override
   protected boolean isSplitable(JobContext context, Path file) {
     CompressionCodec codec = 
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Tue Dec 23 10:16:59 2008
@@ -22,6 +22,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -58,38 +60,71 @@
   private Context outer;
   private List<MapRunner> runners;
 
-  public static int getNumberOfThreads(Configuration conf) {
-    return conf.getInt("mapred.map.multithreadedrunner.threads", 10);
-  }
-
-  public static void setNumberOfThreads(Configuration conf, int threads) {
-    conf.setInt("mapred.map.multithreadedrunner.threads", threads);
-  }
-
+  /**
+   * The number of threads in the thread pool that will run the map function.
+   * @param job the job
+   * @return the number of threads
+   */
+  public static int getNumberOfThreads(JobContext job) {
+    return job.getConfiguration().
+            getInt("mapred.map.multithreadedrunner.threads", 10);
+  }
+
+  /**
+   * Set the number of threads in the pool for running maps.
+   * @param job the job to modify
+   * @param threads the new number of threads
+   */
+  public static void setNumberOfThreads(Job job, int threads) {
+    job.getConfiguration().setInt("mapred.map.multithreadedrunner.threads", 
+                                  threads);
+  }
+
+  /**
+   * Get the application's mapper class.
+   * @param <K1> the map's input key type
+   * @param <V1> the map's input value type
+   * @param <K2> the map's output key type
+   * @param <V2> the map's output value type
+   * @param job the job
+   * @return the mapper class to run
+   */
   @SuppressWarnings("unchecked")
   public static <K1,V1,K2,V2>
-  Class<Mapper<K1,V1,K2,V2>> getMapperClass(Configuration conf) {
+  Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
     return (Class<Mapper<K1,V1,K2,V2>>) 
-           conf.getClass("mapred.map.multithreadedrunner.class",
-                         Mapper.class);
+         job.getConfiguration().getClass("mapred.map.multithreadedrunner.class",
+                                         Mapper.class);
   }
   
+  /**
+   * Set the application's mapper class.
+   * @param <K1> the map input key type
+   * @param <V1> the map input value type
+   * @param <K2> the map output key type
+   * @param <V2> the map output value type
+   * @param job the job to modify
+   * @param cls the class to use as the mapper
+   */
   public static <K1,V1,K2,V2> 
-  void setMapperClass(Configuration conf, 
+  void setMapperClass(Job job, 
                       Class<Mapper<K1,V1,K2,V2>> cls) {
     if (MultithreadedMapper.class.isAssignableFrom(cls)) {
       throw new IllegalArgumentException("Can't have recursive " + 
                                          "MultithreadedMapper instances.");
     }
-    conf.setClass("mapred.map.multithreadedrunner.class", cls, Mapper.class);
+    job.getConfiguration().setClass("mapred.map.multithreadedrunner.class",
+                                    cls, Mapper.class);
   }
 
+  /**
+   * Run the application's maps using a thread pool.
+   */
   @Override
   public void run(Context context) throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
     outer = context;
-    int numberOfThreads = getNumberOfThreads(conf);
-    mapClass = getMapperClass(conf);
+    int numberOfThreads = getNumberOfThreads(context);
+    mapClass = getMapperClass(context);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
                 " threads");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java Tue Dec 23 10:16:59 2008
@@ -25,6 +25,9 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 
+/**
+ * Tokenize the input values and emit each word with a count of 1.
+ */
 public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
     
   private final static IntWritable one = new IntWritable(1);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Tue Dec 23 10:16:59 2008
@@ -47,6 +47,12 @@
   private Path outputPath = null;
   private Path workPath = null;
 
+  /**
+   * Create a file output committer
+   * @param outputPath the job's output path
+   * @param context the task's context
+   * @throws IOException
+   */
   public FileOutputCommitter(Path outputPath, 
                              TaskAttemptContext context) throws IOException {
     if (outputPath != null) {
@@ -59,6 +65,11 @@
     }
   }
 
+  /**
+   * Create the temporary directory that is the root of all of the task 
+   * work directories.
+   * @param context the job's context
+   */
   public void setupJob(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
@@ -69,6 +80,10 @@
     }
   }
 
+  /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   */
   public void cleanupJob(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
@@ -79,12 +94,20 @@
     }
   }
 
+  /**
+   * No task setup required.
+   */
+  @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
     // FileOutputCommitter's setupTask doesn't do anything. Because the
     // temporary task directory is created on demand when the 
     // task is writing.
   }
-		  
+
+  /**
+   * Move the files from the work directory to the job output directory
+   * @param context the task context
+   */
   public void commitTask(TaskAttemptContext context) 
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
@@ -103,7 +126,15 @@
       }
     }
   }
-		  
+
+  /**
+   * Move all of the files from the work directory to the final output
+   * @param context the task context
+   * @param fs the output file system
+   * @param jobOutputDir the final output direcotry
+   * @param taskOutput the work path
+   * @throws IOException
+   */
   private void moveTaskOutputs(TaskAttemptContext context,
                                FileSystem fs,
                                Path jobOutputDir,
@@ -137,6 +168,10 @@
     }
   }
 
+  /**
+   * Delete the work directory
+   */
+  @Override
   public void abortTask(TaskAttemptContext context) {
     try {
       context.progress();
@@ -146,11 +181,20 @@
     }
   }
 
+  /**
+   * Find the final name of a given output file, given the job output directory
+   * and the work directory.
+   * @param jobOutputDir the job's output directory
+   * @param taskOutput the specific task output file
+   * @param taskOutputPath the job's work directory
+   * @return the final path for the specific output file
+   * @throws IOException
+   */
   private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
                             Path taskOutputPath) throws IOException {
     URI taskOutputUri = taskOutput.toUri();
     URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+    if (taskOutputUri == relativePath) {
       throw new IOException("Can not get the relative path: base = " + 
           taskOutputPath + " child = " + taskOutput);
     }
@@ -161,6 +205,11 @@
     }
   }
 
+  /**
+   * Did this task write any files in the work directory?
+   * @param context the task's context
+   */
+  @Override
   public boolean needsTaskCommit(TaskAttemptContext context
                                  ) throws IOException {
     return workPath != null && outputFileSystem.exists(workPath);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -59,12 +59,12 @@
   
   /**
    * Is the job output compressed?
-   * @param conf the {@link Configuration} to look in
+   * @param job the Job to look in
    * @return <code>true</code> if the job output should be compressed,
    *         <code>false</code> otherwise
    */
-  public static boolean getCompressOutput(Configuration conf) {
-    return conf.getBoolean("mapred.output.compress", false);
+  public static boolean getCompressOutput(JobContext job) {
+    return job.getConfiguration().getBoolean("mapred.output.compress", false);
   }
   
   /**
@@ -84,17 +84,17 @@
   
   /**
    * Get the {@link CompressionCodec} for compressing the job outputs.
-   * @param conf the {@link Configuration} to look in
+   * @param job the {@link Job} to look in
    * @param defaultValue the {@link CompressionCodec} to return if not set
    * @return the {@link CompressionCodec} to be used to compress the 
    *         job outputs
    * @throws IllegalArgumentException if the class was specified, but not found
    */
   public static Class<? extends CompressionCodec> 
-  getOutputCompressorClass(Configuration conf, 
+  getOutputCompressorClass(JobContext job, 
 		                       Class<? extends CompressionCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
-    
+    Configuration conf = job.getConfiguration();
     String name = conf.get("mapred.output.compression.codec");
     if (name != null) {
       try {
@@ -109,18 +109,17 @@
   }
   
   public abstract RecordWriter<K, V> 
-     getRecordWriter(TaskAttemptContext context
+     getRecordWriter(TaskAttemptContext job
                      ) throws IOException, InterruptedException;
 
-  public void checkOutputSpecs(JobContext context
+  public void checkOutputSpecs(JobContext job
                                ) throws FileAlreadyExistsException, IOException{
     // Ensure that the output directory is set and not already there
-    Configuration job = context.getConfiguration();
     Path outDir = getOutputPath(job);
     if (outDir == null) {
       throw new InvalidJobConfException("Output directory not set.");
     }
-    if (outDir.getFileSystem(job).exists(outDir)) {
+    if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 
                                            " already exists");
     }
@@ -143,8 +142,8 @@
    * @return the {@link Path} to the output directory for the map-reduce job.
    * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
    */
-  public static Path getOutputPath(Configuration conf) {
-    String name = conf.get("mapred.output.dir");
+  public static Path getOutputPath(JobContext job) {
+    String name = job.getConfiguration().get("mapred.output.dir");
     return name == null ? null: new Path(name);
   }
   
@@ -261,7 +260,7 @@
      OutputCommitter getOutputCommitter(TaskAttemptContext context
                                         ) throws IOException {
     if (committer == null) {
-      Path output = getOutputPath(context.getConfiguration());
+      Path output = getOutputPath(context);
       committer = new FileOutputCommitter(output, context);
     }
     return committer;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,12 +45,13 @@
     
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
-    if (getCompressOutput(conf)) {
+    if (getCompressOutput(context)) {
       // find the kind of compression to do
-      compressionType = getOutputCompressionType(conf);
+      compressionType = getOutputCompressionType(context);
 
       // find the right codec
-      Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class);
+      Class<?> codecClass = getOutputCompressorClass(context, 
+                                                     DefaultCodec.class);
       codec = (CompressionCodec) 
         ReflectionUtils.newInstance(codecClass, conf);
     }
@@ -80,13 +82,13 @@
 
   /**
    * Get the {@link CompressionType} for the output {@link SequenceFile}.
-   * @param conf the {@link Configuration}
+   * @param job the {@link Job}
    * @return the {@link CompressionType} for the output {@link SequenceFile}, 
    *         defaulting to {@link CompressionType#RECORD}
    */
-  public static CompressionType getOutputCompressionType(Configuration conf) {
-    String val = conf.get("mapred.output.compression.type", 
-                          CompressionType.RECORD.toString());
+  public static CompressionType getOutputCompressionType(JobContext job) {
+    String val = job.getConfiguration().get("mapred.output.compression.type", 
+                                            CompressionType.RECORD.toString());
     return CompressionType.valueOf(val);
   }
   

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -108,26 +108,27 @@
   }
 
   public RecordWriter<K, V> 
-         getRecordWriter(TaskAttemptContext context
+         getRecordWriter(TaskAttemptContext job
                          ) throws IOException, InterruptedException {
-    Configuration job = context.getConfiguration();
+    Configuration conf = job.getConfiguration();
     boolean isCompressed = getCompressOutput(job);
-    String keyValueSeparator= job.get("mapred.textoutputformat.separator","\t");
+    String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
+                                       "\t");
     CompressionCodec codec = null;
     String extension = "";
     if (isCompressed) {
       Class<? extends CompressionCodec> codecClass = 
         getOutputCompressorClass(job, GzipCodec.class);
-      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
       extension = codec.getDefaultExtension();
     }
-    Path file = getDefaultWorkFile(context, extension);
-    FileSystem fs = file.getFileSystem(job);
+    Path file = getDefaultWorkFile(job, extension);
+    FileSystem fs = file.getFileSystem(conf);
     if (!isCompressed) {
-      FSDataOutputStream fileOut = fs.create(file, context);
+      FSDataOutputStream fileOut = fs.create(file, false);
       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
     } else {
-      FSDataOutputStream fileOut = fs.create(file, context);
+      FSDataOutputStream fileOut = fs.create(file, false);
       return new LineRecordWriter<K, V>(new DataOutputStream
                                         (codec.createOutputStream(fileOut)),
                                         keyValueSeparator);