You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/04/24 19:38:44 UTC

svn commit: r1471556 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: acmurthy
Date: Wed Apr 24 17:38:43 2013
New Revision: 1471556

URL: http://svn.apache.org/r1471556
Log:
MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked during error handling. Note that it is an incompatible change, however end-users can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C. Murthy.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1471556&r1=1471555&r2=1471556&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Apr 24 17:38:43 2013
@@ -339,6 +339,14 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5146. application classloader may be used too early to load
     classes. (Sangjin Lee via tomwhite)
 
+    MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+    with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that
+    cleanup is now called even if there is an error. The old mapred API
+    already ensures that Mapper.close and Reducer.close are invoked during
+    error handling. Note that it is an incompatible change, however end-users 
+    can override Mapper.run and Reducer.run to get the old (inconsistent) 
+    behaviour. (acmurthy)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1471556&r1=1471555&r2=1471556&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Apr 24 17:38:43 2013
@@ -434,10 +434,15 @@ public class MapTask extends Task {
       }
       statusUpdate(umbilical);
       collector.flush();
-    } finally {
-      //close
-      in.close();                               // close input
+      
+      in.close();
+      in = null;
+      
       collector.close();
+      collector = null;
+    } finally {
+      closeQuietly(in);
+      closeQuietly(collector);
     }
   }
 
@@ -753,13 +758,20 @@ public class MapTask extends Task {
           new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
               mapContext);
 
-    input.initialize(split, mapperContext);
-    mapper.run(mapperContext);
-    mapPhase.complete();
-    setPhase(TaskStatus.Phase.SORT);
-    statusUpdate(umbilical);
-    input.close();
-    output.close(mapperContext);
+    try {
+      input.initialize(split, mapperContext);
+      mapper.run(mapperContext);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
+      input.close();
+      input = null;
+      output.close(mapperContext);
+      output = null;
+    } finally {
+      closeQuietly(input);
+      closeQuietly(output, mapperContext);
+    }
   }
 
   class DirectMapOutputCollector<K, V>
@@ -1949,4 +1961,55 @@ public class MapTask extends Task {
     }
   }
 
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+          mapperContext) {
+    if (c != null) {
+      try {
+        c.close(mapperContext);
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1471556&r1=1471555&r2=1471556&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 24 17:38:43 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -428,14 +429,15 @@ public class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+    RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
         this, job, reporter, finalName);
-
+    final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
+    
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
-          out.write(key, value);
+          finalOut.write(key, value);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -466,20 +468,14 @@ public class ReduceTask extends Task {
         values.informReduceProgress();
       }
 
-      //Clean up: repeated in catch block below
       reducer.close();
-      out.close(reporter);
-      //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
+      reducer = null;
       
-      throw ioe;
+      out.close(reporter);
+      out = null;
+    } finally {
+      IOUtils.cleanup(LOG, reducer);
+      closeQuietly(out, reporter);
     }
   }
 
@@ -645,7 +641,21 @@ public class ReduceTask extends Task {
                                                committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
+    try {
+      reducer.run(reducerContext);
+    } finally {
+      trackedRW.close(reducerContext);
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+    if (c != null) {
+      try {
+        c.close(r);
+      } catch (Exception e) {
+        LOG.info("Exception in closing " + c, e);
+      }
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1471556&r1=1471555&r2=1471556&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Wed Apr 24 17:38:43 2013
@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKeyValue()) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1471556&r1=1471555&r2=1471556&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Wed Apr 24 17:38:43 2013
@@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKey()) {
-      reduce(context.getCurrentKey(), context.getValues(), context);
-      // If a back up store is used, reset it
-      Iterator<VALUEIN> iter = context.getValues().iterator();
-      if(iter instanceof ReduceContext.ValueIterator) {
-        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+    try {
+      while (context.nextKey()) {
+        reduce(context.getCurrentKey(), context.getValues(), context);
+        // If a back up store is used, reset it
+        Iterator<VALUEIN> iter = context.getValues().iterator();
+        if(iter instanceof ReduceContext.ValueIterator) {
+          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+        }
       }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java?rev=1471556&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java Wed Apr 24 17:38:43 2013
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMapperReducerCleanup {
+
+  static boolean mapCleanup = false;
+  static boolean reduceCleanup = false;
+  static boolean recordReaderCleanup = false;
+  static boolean recordWriterCleanup = false;
+  
+  static void reset() {
+    mapCleanup = false;
+    reduceCleanup = false; 
+    recordReaderCleanup = false;
+    recordWriterCleanup = false;
+  }
+  
+  private static class FailingMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    /** Map method with different behavior based on the thread id */
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  private static class TrackingTokenizerMapper 
+  extends Mapper<Object, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+      
+    public void map(Object key, Text value, Context context
+                    ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+    
+  }
+
+  private static class FailingReducer
+      extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+    public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static class TrackingIntSumReducer extends IntSumReducer {
+
+    @SuppressWarnings("unchecked")
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+}
+
+  public static class TrackingTextInputFormat extends TextInputFormat {
+
+    public static class TrackingRecordReader extends LineRecordReader {
+      @Override
+      public synchronized void close() throws IOException {
+        recordReaderCleanup = true;
+        super.close();
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(
+        InputSplit split, TaskAttemptContext context) {
+      return new TrackingRecordReader();
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static class TrackingTextOutputFormat extends TextOutputFormat {
+    
+    public static class TrackingRecordWriter extends LineRecordWriter {
+
+      public TrackingRecordWriter(DataOutputStream out) {
+        super(out);
+      }
+
+      @Override
+      public synchronized void close(TaskAttemptContext context)
+          throws IOException {
+        recordWriterCleanup = true;
+        super.close(context);
+      }
+
+    }
+    
+    @Override
+    public RecordWriter getRecordWriter(TaskAttemptContext job)
+        throws IOException, InterruptedException {
+      Configuration conf = job.getConfiguration();
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(conf);
+      FSDataOutputStream fileOut = fs.create(file, false);
+      
+      return new TrackingRecordWriter(fileOut);
+    }
+  
+  }
+
+
+  /**
+   * Create a single input file in the input directory.
+   * @param dirPath the directory in which the file resides
+   * @param id the file id number
+   * @param numRecords how many records to write to each file.
+   */
+  private void createInputFile(Path dirPath, int id, int numRecords)
+      throws IOException {
+    final String MESSAGE = "This is a line in a file: ";
+
+    Path filePath = new Path(dirPath, "" + id);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+    for (int i = 0; i < numRecords; i++) {
+      w.write(MESSAGE + id + " " + i + "\n");
+    }
+
+    w.close();
+  }
+
+  private final String INPUT_DIR = "input";
+  private final String OUTPUT_DIR = "output";
+
+  private Path getInputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(INPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), INPUT_DIR);
+    }
+  }
+
+  private Path getOutputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(OUTPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), OUTPUT_DIR);
+    }
+  }
+
+  private Path createInput() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path inputPath = getInputPath();
+
+    // Clear the input directory if it exists, first.
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create an input file
+    createInputFile(inputPath, 0, 10);
+
+    return inputPath;
+  }
+
+  @Test
+  public void testMapCleanup() throws Exception {
+    reset();
+    
+    Job job = Job.getInstance();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(FailingMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(0);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+  @Test
+  public void testReduceCleanup() throws Exception {
+    reset();
+    
+    Job job = Job.getInstance();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(FailingReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+  
+  @Test
+  public void testJobSuccessCleanup() throws Exception {
+    reset();
+    
+    Job job = Job.getInstance();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(TrackingIntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+}