You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/05/29 16:43:41 UTC

svn commit: r1343755 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: tomwhite
Date: Tue May 29 14:43:40 2012
New Revision: 1343755

URL: http://svn.apache.org/viewvc?rev=1343755&view=rev
Log:
MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. Contributed by Ahmed Radwan.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java   (with props)
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue May 29 14:43:40 2012
@@ -126,6 +126,9 @@ Release 2.0.1-alpha - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4146. Support limits on task status string length and number of
+    block locations in branch-2. (Ahmed Radwan via tomwhite)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue May 29 14:43:40 2012
@@ -53,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -569,7 +568,21 @@ abstract public class Task implements Wr
         resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
     }
   }
-  
+
+  public static String normalizeStatus(String status, Configuration conf) {
+    // Check to see if the status string is too long
+    // and truncate it if needed.
+    int progressStatusLength = conf.getInt(
+        MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+        MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+    if (status.length() > progressStatusLength) {
+      LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
+          + progressStatusLength + " characters)");
+      status = status.substring(0, progressStatusLength);
+    }
+    return status;
+  }
+
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   protected class TaskReporter 
@@ -603,7 +616,7 @@ abstract public class Task implements Wr
       return progressFlag.getAndSet(false);
     }
     public void setStatus(String status) {
-      taskProgress.setStatus(status);
+      taskProgress.setStatus(normalizeStatus(status, conf));
       // indicate that progress update needs to be sent
       setProgressFlag();
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Tue May 29 14:43:40 2012
@@ -71,4 +71,12 @@ public interface MRConfig {
 
   public static final String TASK_LOCAL_OUTPUT_CLASS =
   "mapreduce.task.local.output.class";
+
+  public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
+    "mapreduce.task.max.status.length";
+  public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
+
+  public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
+  public static final String MAX_BLOCK_LOCATIONS_KEY =
+    "mapreduce.job.max.split.locations";
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Tue May 29 14:43:40 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -48,6 +49,7 @@ public class JobSplitWriter {
 
   private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
   private static final byte[] SPLIT_FILE_HEADER;
+
   static {
     try {
       SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
@@ -82,7 +84,7 @@ public class JobSplitWriter {
   throws IOException {
     FSDataOutputStream out = createFile(fs, 
         JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
-    SplitMetaInfo[] info = writeOldSplits(splits, out);
+    SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
     out.close();
     writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
@@ -114,6 +116,8 @@ public class JobSplitWriter {
     if (array.length != 0) {
       SerializationFactory factory = new SerializationFactory(conf);
       int i = 0;
+      int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
+          MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
       long offset = out.getPos();
       for(T split: array) {
         long prevCount = out.getPos();
@@ -123,9 +127,15 @@ public class JobSplitWriter {
         serializer.open(out);
         serializer.serialize(split);
         long currCount = out.getPos();
+        String[] locations = split.getLocations();
+        if (locations.length > maxBlockLocations) {
+          throw new IOException("Max block location exceeded for split: "
+              + split + " splitsize: " + locations.length +
+              " maxsize: " + maxBlockLocations);
+        }
         info[i++] = 
           new JobSplit.SplitMetaInfo( 
-              split.getLocations(), offset,
+              locations, offset,
               split.getLength());
         offset += currCount - prevCount;
       }
@@ -135,18 +145,26 @@ public class JobSplitWriter {
   
   private static SplitMetaInfo[] writeOldSplits(
       org.apache.hadoop.mapred.InputSplit[] splits,
-      FSDataOutputStream out) throws IOException {
+      FSDataOutputStream out, Configuration conf) throws IOException {
     SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
     if (splits.length != 0) {
       int i = 0;
       long offset = out.getPos();
+      int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
+          MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
       for(org.apache.hadoop.mapred.InputSplit split: splits) {
         long prevLen = out.getPos();
         Text.writeString(out, split.getClass().getName());
         split.write(out);
         long currLen = out.getPos();
+        String[] locations = split.getLocations();
+        if (locations.length > maxBlockLocations) {
+          throw new IOException("Max block location exceeded for split: "
+              + split + " splitsize: " + locations.length +
+              " maxsize: " + maxBlockLocations);
+        }
         info[i++] = new JobSplit.SplitMetaInfo( 
-            split.getLocations(), offset,
+            locations, offset,
             split.getLength());
         offset += currLen - prevLen;
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Tue May 29 14:43:40 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.task
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.StatusReporter;
@@ -92,8 +93,9 @@ public class TaskAttemptContextImpl exte
    */
   @Override
   public void setStatus(String status) {
-    setStatusString(status);
-    reporter.setStatus(status);
+    String normalizedStatus = Task.normalizeStatus(status, conf);
+    setStatusString(normalizedStatus);
+    reporter.setStatus(normalizedStatus);
   }
 
   public static class DummyReporter extends StatusReporter {

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java?rev=1343755&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java Tue May 29 14:43:40 2012
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A JUnit test to test limits on block locations
+ */
+public class TestBlockLimits extends TestCase {
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+  public void testWithLimits() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    MiniMRClientCluster mr = null;
+    try {
+      mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
+          new Configuration());
+      runCustomFormat(mr);
+    } finally {
+      if (mr != null) {
+        mr.stop();
+      }
+    }
+  }
+
+  private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
+    JobConf job = new JobConf(mr.getConfig());
+    FileSystem fileSys = FileSystem.get(job);
+    Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
+    Path outDir = new Path(testDir, "out");
+    System.out.println("testDir= " + testDir);
+    fileSys.delete(testDir, true);
+    job.setInputFormat(MyInputFormat.class);
+    job.setOutputFormat(MyOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    job.setMapperClass(MyMapper.class);
+    job.setReducerClass(MyReducer.class);
+    job.setNumMapTasks(100);
+    job.setNumReduceTasks(1);
+    job.set("non.std.out", outDir.toString());
+    try {
+      JobClient.runJob(job);
+      assertTrue(false);
+    } catch (IOException ie) {
+      System.out.println("Failed job " + StringUtils.stringifyException(ie));
+    } finally {
+      fileSys.delete(testDir, true);
+    }
+
+  }
+
+  static class MyMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+    }
+  }
+
+  static class MyReducer extends MapReduceBase implements
+      Reducer<WritableComparable, Writable, WritableComparable, Writable> {
+    public void reduce(WritableComparable key, Iterator<Writable> values,
+        OutputCollector<WritableComparable, Writable> output, Reporter reporter)
+        throws IOException {
+    }
+  }
+
+  private static class MyInputFormat implements InputFormat<IntWritable, Text> {
+
+    private static class MySplit implements InputSplit {
+      int first;
+      int length;
+
+      public MySplit() {
+      }
+
+      public MySplit(int first, int length) {
+        this.first = first;
+        this.length = length;
+      }
+
+      public String[] getLocations() {
+        return new String[200];
+      }
+
+      public long getLength() {
+        return length;
+      }
+
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, first);
+        WritableUtils.writeVInt(out, length);
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        first = WritableUtils.readVInt(in);
+        length = WritableUtils.readVInt(in);
+      }
+    }
+
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
+          new MySplit(4, 2) };
+    }
+
+    public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return null;
+    }
+
+  }
+
+  static class MyOutputFormat implements OutputFormat {
+    static class MyRecordWriter implements RecordWriter<Object, Object> {
+
+      public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
+      }
+
+      public void write(Object key, Object value) throws IOException {
+        return;
+      }
+
+      public void close(Reporter reporter) throws IOException {
+      }
+    }
+
+    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+        String name, Progressable progress) throws IOException {
+      return new MyRecordWriter(new Path(job.get("non.std.out")), job);
+    }
+
+    public void checkOutputSpecs(FileSystem ignored, JobConf job)
+        throws IOException {
+    }
+  }
+
+}
\ No newline at end of file

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java?rev=1343755&r1=1343754&r2=1343755&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java Tue May 29 14:43:40 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -25,10 +26,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 /**
@@ -98,7 +104,28 @@ public class TestReporter {
                    progressRange, reporter.getProgress(), 0f);
     }
   }
-  
+
+  static class StatusLimitMapper extends
+      org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException {
+      StringBuilder sb = new StringBuilder(512);
+      for (int i = 0; i < 1000; i++) {
+        sb.append("a");
+      }
+      context.setStatus(sb.toString());
+      int progressStatusLength = context.getConfiguration().getInt(
+          MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+          MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+
+      if (context.getStatus().length() > progressStatusLength) {
+        throw new IOException("Status is not truncated");
+      }
+    }
+  }
+
   /**
    * Test {@link Reporter}'s progress for a map-only job.
    * This will make sure that only the map phase decides the attempt's progress.
@@ -166,7 +193,6 @@ public class TestReporter {
   /**
    * Test {@link Reporter}'s progress for map-reduce job.
    */
-  @SuppressWarnings("deprecation")
   @Test
   public void testReporterProgressForMRJob() throws IOException {
     Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
@@ -186,4 +212,39 @@ public class TestReporter {
     
     assertTrue("Job failed", job.isSuccessful());
   }
+
+  @Test
+  public void testStatusLimit() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Path test = new Path(testRootTempDir, "testStatusLimit");
+
+    Configuration conf = new Configuration();
+    Path inDir = new Path(test, "in");
+    Path outDir = new Path(test, "out");
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(inDir)) {
+      fs.delete(inDir, true);
+    }
+    fs.mkdirs(inDir);
+    DataOutputStream file = fs.create(new Path(inDir, "part-" + 0));
+    file.writeBytes("testStatusLimit");
+    file.close();
+
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+
+    Job job = Job.getInstance(conf, "testStatusLimit");
+
+    job.setMapperClass(StatusLimitMapper.class);
+    job.setNumReduceTasks(0);
+
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    job.waitForCompletion(true);
+
+    assertTrue("Job failed", job.isSuccessful());
+  }
+
 }
\ No newline at end of file