You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/08/11 14:03:47 UTC

svn commit: r684731 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/lib/ src/ma...

Author: ddas
Date: Mon Aug 11 05:03:37 2008
New Revision: 684731

URL: http://svn.apache.org/viewvc?rev=684731&view=rev
Log:
HADOOP-153. Provides a way to skip bad records. Contributed by Sharad Agarwal.

Added:
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 11 05:03:37 2008
@@ -70,6 +70,8 @@
     HADOOP-2302. Provides a comparator for numerical sorting of key fields.
     (ddas)
 
+    HADOOP-153. Provides a way to skip bad records. (Sharad Agarwal via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Mon Aug 11 05:03:37 2008
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
@@ -34,6 +35,7 @@
 public class PipeMapper extends PipeMapRed implements Mapper {
 
   private boolean ignoreKey = false;
+  private boolean skipping = false;
 
   private byte[] mapOutputFieldSeparator;
   private byte[] mapInputFieldSeparator;
@@ -59,6 +61,11 @@
   
   public void configure(JobConf job) {
     super.configure(job);
+    //disable the auto increment of the counter. For streaming, no of 
+    //processed records could be different(equal or less) than the no of 
+    //records input.
+    SkipBadRecords.setAutoIncrMapperProcCount(job, false);
+    skipping = job.getBoolean("mapred.skip.on", false);
     String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
     ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
 
@@ -101,6 +108,11 @@
         }
         write(value);
         clientOut_.write('\n');
+        if(skipping) {
+          //flush the streams on every record input if running in skip mode
+          //so that we don't buffer other records surrounding a bad record. 
+          clientOut_.flush();
+        }
       } else {
         numRecSkipped_++;
       }

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Mon Aug 11 05:03:37 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Writable;
@@ -39,6 +40,7 @@
   private byte[] reduceOutFieldSeparator;
   private byte[] reduceInputFieldSeparator;
   private int numOfReduceOutputKeyFields = 1;
+  private boolean skipping = false;
   
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.reduce.streamprocessor");
@@ -61,6 +63,11 @@
 
   public void configure(JobConf job) {
     super.configure(job);
+    //disable the auto increment of the counter. For streaming, no of 
+    //processed records could be different(equal or less) than the no of 
+    //records input.
+    SkipBadRecords.setAutoIncrReducerProcCount(job, false);
+    skipping = job.getBoolean("mapred.skip.on", false);
 
     try {
       reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
@@ -99,6 +106,11 @@
           output.collect(key, val);
         }
       }
+      if(doPipe_ && skipping) {
+        //flush the streams on every record input if running in skip mode
+        //so that we don't buffer other records surrounding a bad record. 
+        clientOut_.flush();
+      }
     } catch (IOException io) {
       // a common reason to get here is failure of the subprocess.
       // Document that fact, if possible.

Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SkipBadRecords;
+
+public class TestStreamingBadRecords extends ClusterMapReduceTestCase
+{
+
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestStreamingBadRecords");
+  
+  private static final List<String> MAPPER_BAD_RECORDS = 
+    Arrays.asList("hey022","hey023","hey099");
+  
+  private static final List<String> REDUCER_BAD_RECORDS = 
+    Arrays.asList("hey001","hey024");
+  
+  private static final String badMapper = 
+    StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
+  private static final String badReducer = 
+    StreamUtil.makeJavaCommand(BadApp.class, new String[]{"true"});
+  private static final int INPUTSIZE=100;
+  
+  public TestStreamingBadRecords() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+  
+  private void createInput() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), 
+        "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    //increasing the record size so that we have stream flushing
+    String prefix = new String(new byte[20*1024]);
+    for(int i=1;i<=INPUTSIZE;i++) {
+      String str = ""+i;
+      int zerosToPrepend = 3 - str.length();
+      for(int j=0;j<zerosToPrepend;j++){
+        str = "0"+str;
+      }
+      wr.write(prefix + "hey"+str+"\n");
+    }wr.close();
+  }
+  
+  private void validateOutput(RunningJob runningJob, boolean validateCount) 
+    throws Exception {
+    LOG.info(runningJob.getCounters().toString());
+    assertTrue(runningJob.isSuccessful());
+    List<String> badRecs = new ArrayList<String>();
+    badRecs.addAll(MAPPER_BAD_RECORDS);
+    badRecs.addAll(REDUCER_BAD_RECORDS);
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(getOutputDir(),
+        new OutputLogFilter()));
+    
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String value = tokeniz.nextToken();
+        int index = value.indexOf("hey");
+        assertTrue(index>-1);
+        if(index>-1) {
+          String heyStr = value.substring(index);
+          assertTrue(!badRecs.contains(heyStr));
+        }
+        
+        line = reader.readLine();
+      }
+      reader.close();
+      if(validateCount) {
+        assertEquals(INPUTSIZE-badRecs.size(), counter);
+      }
+    }
+  }
+
+  public void testDisableSkip() throws Exception {
+    JobConf clusterConf = createJobConf();
+    createInput();
+    
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+                          +1+MAPPER_BAD_RECORDS.size();
+    int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+                           +1+REDUCER_BAD_RECORDS.size();
+    String[] args =  new String[] {
+      "-input", (new Path(getInputDir(), "text.txt")).toString(),
+      "-output", getOutputDir().toString(),
+      "-mapper", badMapper,
+      "-reducer", badReducer,
+      "-verbose",
+      "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+      "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
+      "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
+      "-jobconf", "mapred.skip.mode.enabled=false",
+      "-jobconf", "mapred.map.tasks=1",
+      "-jobconf", "mapred.reduce.tasks=1",
+      "-jobconf", "mapred.task.timeout=30000",
+      "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
+      "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
+      "-jobconf", "mapred.job.tracker.http.address="
+                    +clusterConf.get("mapred.job.tracker.http.address"),
+      "-jobconf", "stream.debug=set",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+    StreamJob job = new StreamJob(args, false);      
+    job.go();
+    assertFalse(job.running_.isSuccessful());
+  }
+  
+  public void testSkip() throws Exception {
+    JobConf clusterConf = createJobConf();
+    createInput();
+    
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+                          +1+MAPPER_BAD_RECORDS.size();
+    int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+                           +1+REDUCER_BAD_RECORDS.size();
+    
+    String[] args =  new String[] {
+      "-input", (new Path(getInputDir(), "text.txt")).toString(),
+      "-output", getOutputDir().toString(),
+      "-mapper", badMapper,
+      "-reducer", badReducer,
+      "-verbose",
+      "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+      "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
+      "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
+      "-jobconf", "mapred.map.tasks=1",
+      "-jobconf", "mapred.reduce.tasks=1",
+      "-jobconf", "mapred.task.timeout=30000",
+      "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
+      "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
+      "-jobconf", "mapred.job.tracker.http.address="
+                    +clusterConf.get("mapred.job.tracker.http.address"),
+      "-jobconf", "stream.debug=set",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+    StreamJob job = new StreamJob(args, false);      
+    job.go();
+    validateOutput(job.running_, false);
+  }
+  
+  static class App{
+    boolean isReducer;
+    
+    public App(String[] args) throws Exception{
+      if(args.length>0) {
+        isReducer = Boolean.parseBoolean(args[0]);
+      }
+      String counter = Counters.Application.MAP_PROCESSED_RECORDS;
+      if(isReducer) {
+        counter = Counters.Application.REDUCE_PROCESSED_RECORDS;
+      }
+      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+      String line;
+      int count = 0;
+      while ((line = in.readLine()) != null) {
+        processLine(line);
+        count++;
+        if(count>=10) {
+          System.err.println("reporter:counter:"+Counters.Application.GROUP+","+
+              counter+","+count);
+          count = 0;
+        }
+      }
+    }
+    
+    protected void processLine(String line) throws Exception{
+      System.out.println(line);
+    }
+    
+    
+    public static void main(String[] args) throws Exception{
+      new App(args);
+    }
+  }
+  
+  static class BadApp extends App{
+    
+    public BadApp(String[] args) throws Exception {
+      super(args);
+    }
+
+    protected void processLine(String line) throws Exception {
+      List<String> badRecords = MAPPER_BAD_RECORDS;
+      if(isReducer) {
+        badRecords = REDUCER_BAD_RECORDS;
+      }
+      if(badRecords.size()>0 && line.contains(badRecords.get(0))) {
+        LOG.warn("Encountered BAD record");
+        System.exit(-1);
+      }
+      else if(badRecords.size()>1 && line.contains(badRecords.get(1))) {
+        LOG.warn("Encountered BAD record");
+        throw new Exception("Got bad record..crashing");
+      }
+      else if(badRecords.size()>2 && line.contains(badRecords.get(2))) {
+        LOG.warn("Encountered BAD record");
+        Thread.sleep(15*60*1000);
+      }
+      super.processLine(line);
+    }
+    
+    public static void main(String[] args) throws Exception{
+      new BadApp(args);
+    }
+  }
+  
+  
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Mon Aug 11 05:03:37 2008
@@ -488,4 +488,14 @@
     }
     return buffer.toString();
   }
+  
+  public static class Application {
+    //special counters which are written by the application and are 
+    //used by the framework.
+    public static final String GROUP = "ApplicationCounters";
+    public static final String MAP_PROCESSED_RECORDS = "MapProcessedRecords";
+    public static final String REDUCE_PROCESSED_RECORDS = 
+      "ReduceProcessedRecords";
+    
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Aug 11 05:03:37 2008
@@ -44,8 +44,9 @@
    * version 12 changes the counters representation for HADOOP-1915
    * version 13 added call getBuildVersion() for HADOOP-236
    * Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
+   * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Mon Aug 11 05:03:37 2008
@@ -91,6 +91,11 @@
                                                         int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
+
+    public void reportNextRecordRange(TaskAttemptID taskid, 
+        SortedRanges.Range range) throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
   }
   
   private static ClassLoader makeClassLoader(JobConf conf, 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Mon Aug 11 05:03:37 2008
@@ -277,6 +277,11 @@
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
+    
+    public void reportNextRecordRange(TaskAttemptID taskid, 
+        SortedRanges.Range range) throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
 
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java Mon Aug 11 05:03:37 2008
@@ -27,10 +27,13 @@
     implements MapRunnable<K1, V1, K2, V2> {
   
   private Mapper<K1, V1, K2, V2> mapper;
+  private boolean incrProcCount;
 
   @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
+    this.incrProcCount = job.getBoolean("mapred.skip.on", false) && 
+      SkipBadRecords.getAutoIncrMapperProcCount(job);
   }
 
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -44,6 +47,10 @@
       while (input.next(key, value)) {
         // map pair to output
         mapper.map(key, value, output, reporter);
+        if(incrProcCount) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+        }
       }
     } finally {
       mapper.close();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Aug 11 05:03:37 2008
@@ -31,6 +31,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -142,11 +143,19 @@
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private Iterator<Long> skipFailedRecIndexIterator;
+    private TaskUmbilicalProtocol umbilical;
+    private long recIndex = -1;
+    private long beforePos = -1;
+    private long afterPos = -1;
     
-    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) {
+    TrackedRecordReader(RecordReader<K,V> raw, Counters counters, 
+        TaskUmbilicalProtocol umbilical) {
       rawIn = raw;
+      this.umbilical = umbilical;
       inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
       inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+      skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
     }
 
     public K createKey() {
@@ -158,17 +167,34 @@
     }
      
     public synchronized boolean next(K key, V value)
-      throws IOException {
-
-      setProgress(getProgress());
-      long beforePos = getPos();
-      boolean ret = rawIn.next(key, value);
+    throws IOException {
+      boolean ret = moveToNext(key, value);
+      if(isSkipping() && ret) {
+        long nextRecIndex = skipFailedRecIndexIterator.next();
+        long skip = nextRecIndex - recIndex;
+        for(int i=0;i<skip && ret;i++) {
+          ret = moveToNext(key, value);
+        }
+        getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
+        reportNextRecordRange(umbilical, nextRecIndex);
+      }
       if (ret) {
         inputRecordCounter.increment(1);
-        inputByteCounter.increment(getPos() - beforePos);
+        inputByteCounter.increment(afterPos - beforePos);
       }
       return ret;
     }
+     
+    private synchronized boolean moveToNext(K key, V value)
+      throws IOException {
+      setProgress(getProgress());
+      beforePos = getPos();
+      boolean ret = rawIn.next(key, value);
+      recIndex++;
+      afterPos = getPos();
+      return ret;
+    }
+    
     public long getPos() throws IOException { return rawIn.getPos(); }
     public void close() throws IOException { rawIn.close(); }
     public float getProgress() throws IOException {
@@ -218,7 +244,8 @@
       
     RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-    RecordReader in = new TrackedRecordReader(rawIn, getCounters());
+    RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
+    job.setBoolean("mapred.skip.on", isSkipping());
 
     MapRunnable runner =
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Aug 11 05:03:37 2008
@@ -241,10 +241,45 @@
     }
   }
 
+  private class SkippingReduceValuesIterator<KEY,VALUE> 
+     extends ReduceValuesIterator<KEY,VALUE> {
+     private Iterator<Long> skipFailedRecIndexIterator;
+     private TaskUmbilicalProtocol umbilical;
+     private long recIndex = -1;
+     
+     public SkippingReduceValuesIterator(RawKeyValueIterator in,
+         RawComparator<KEY> comparator, Class<KEY> keyClass,
+         Class<VALUE> valClass, Configuration conf, Progressable reporter,
+         TaskUmbilicalProtocol umbilical) throws IOException {
+       super(in, comparator, keyClass, valClass, conf, reporter);
+       this.umbilical = umbilical;
+       skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
+       mayBeSkip();
+     }
+     
+     void nextKey() throws IOException {
+       super.nextKey();
+       mayBeSkip();
+     }
+     
+     private void mayBeSkip() throws IOException {
+       recIndex++;
+       long nextRecIndex = skipFailedRecIndexIterator.next();
+       long skip = nextRecIndex - recIndex;
+       for(int i=0;i<skip && super.more();i++) {
+         super.nextKey();
+         recIndex++;
+       }
+       getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+       reportNextRecordRange(umbilical, nextRecIndex);
+     }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+    job.setBoolean("mapred.skip.on", isSkipping());
     Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
     // start thread that will handle communication with parent
@@ -314,14 +349,24 @@
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
+      boolean incrProcCount = isSkipping() &&
+        SkipBadRecords.getAutoIncrReducerProcCount(job);
       
-      ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
+      ReduceValuesIterator values = isSkipping() ? 
+          new SkippingReduceValuesIterator(rIter, 
+              job.getOutputValueGroupingComparator(), keyClass, valClass, 
+              job, reporter, umbilical) :
+          new ReduceValuesIterator(rIter, 
           job.getOutputValueGroupingComparator(), keyClass, valClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
+        if(incrProcCount) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.REDUCE_PROCESSED_RECORDS, 1);
+        }
         values.nextKey();
         values.informReduceProgress();
       }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utility class for skip bad records functionality. It contains various 
+ * settings related to skipping of bad records.
+ */
+public class SkipBadRecords {
+  
+  private static final String ENABLED = "mapred.skip.mode.enabled";
+  private static final String ATTEMPTS_TO_START_SKIPPING = 
+    "mapred.skip.attempts.to.start.skipping";
+  private static final String AUTO_INCR_MAP_PROC_COUNT = 
+    "mapred.skip.map.auto.incr.proc.count";
+  private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
+    "mapred.skip.reduce.auto.incr.proc.count";
+  
+  /**
+   * Is skipping of bad records enabled. If it is enabled 
+   * the framework will try to find bad records and skip  
+   * them on further attempts.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if skipping is enabled
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getEnabled(Configuration conf) {
+    return conf.getBoolean(ENABLED, true);
+  }
+  
+  /**
+   * Set whether to enable skipping of bad records. If it is enabled 
+   * the framework will try to find bad records and will 
+   * try to skip them on further attempts.
+   * 
+   * @param conf the configuration
+   * @param enabled boolean to enable/disable skipping 
+   */
+  public static void setEnabled(Configuration conf, boolean enabled) {
+    conf.setBoolean(ENABLED, enabled);
+  }
+
+  /**
+   * Get the number of Task attempts AFTER which skip mode 
+   * will be kicked off. When skip mode is kicked off, the 
+   * tasks reports the range of records which it will process 
+   * next to the TaskTracker. So that on failures, TT knows which 
+   * ones are possibly the bad records. On further executions, 
+   * those are skipped.
+   * 
+   * @param conf the configuration
+   * @return attemptsToStartSkipping no of task attempts
+   */
+  public static int getAttemptsToStartSkipping(Configuration conf) {
+    return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
+  }
+
+  /**
+   * Set the number of Task attempts AFTER which skip mode 
+   * will be kicked off. When skip mode is kicked off, the 
+   * tasks reports the range of records which it will process 
+   * next to the TaskTracker. So that on failures, TT knows which 
+   * ones are possibly the bad records. On further executions, 
+   * those are skipped.
+   * 
+   * @param conf the configuration
+   * @param attemptsToStartSkipping no of task attempts
+   */
+  public static void setAttemptsToStartSkipping(Configuration conf, 
+      int attemptsToStartSkipping) {
+    conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
+  }
+
+  /**
+   * Get the flag which if set to true, 
+   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * by MapRunner after invoking the map function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if auto increment 
+   *                           Counters.Application.MAP_PROCESSED_RECORDS.
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getAutoIncrMapperProcCount(Configuration conf) {
+    return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
+  }
+  
+  /**
+   * Set the flag which if set to true, 
+   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * by MapRunner after invoking the map function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @param autoIncr whether to auto increment 
+   *        Counters.Application.MAP_PROCESSED_RECORDS.
+   */
+  public static void setAutoIncrMapperProcCount(Configuration conf, 
+      boolean autoIncr) {
+    conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
+  }
+  
+  /**
+   * Get the flag which if set to true, 
+   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * by framework after invoking the reduce function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if auto increment 
+   *                           Counters.Application.REDUCE_PROCESSED_RECORDS.
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getAutoIncrReducerProcCount(Configuration conf) {
+    return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
+  }
+  
+  /**
+   * Set the flag which if set to true, 
+   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * by framework after invoking the reduce function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @param autoIncr whether to auto increment 
+   *        Counters.Application.REDUCE_PROCESSED_RECORDS.
+   */
+  public static void setAutoIncrReducerProcCount(Configuration conf, 
+      boolean autoIncr) {
+    conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
+  }
+  
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Keeps the Ranges sorted by startIndex.
+ * The added ranges are always ensured to be non-overlapping.
+ * Provides the SkipRangeIterator, which skips the Ranges 
+ * stored in this object.
+ */
+public class SortedRanges implements Writable{
+  
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.SortedRanges");
+  
+  private SortedSet<Range> ranges = new TreeSet<Range>();
+  private int indicesCount;
+  
+  /**
+   * Get Iterator which skips the stored ranges.
+   * The Iterator.next() call return the index starting from 0.
+   * @return Iterator<Long>
+   */
+  public Iterator<Long> skipRangeIterator(){
+    return new SkipRangeIterator();
+  }
+  
+  /**
+   * Get the no of indices stored in the ranges.
+   * @return indices count
+   */
+  public synchronized int getIndicesCount() {
+    return indicesCount;
+  }
+  
+  /**
+   * Add the range indices. It is ensured that the added range 
+   * doesn't overlap the existing ranges. If it overlaps, the 
+   * existing overlapping ranges are removed and a single range 
+   * having the superset of all the removed ranges and this range 
+   * is added. 
+   * If the range is of 0 length, doesn't do anything.
+   * @param range Range to be added.
+   */
+  public synchronized void add(Range range){
+    if(range.isEmpty()) {
+      return;
+    }
+    
+    long startIndex = range.getStartIndex();
+    long endIndex = range.getEndIndex();
+    //make sure that there are no overlapping ranges
+    SortedSet<Range> headSet = ranges.headSet(range);
+    if(headSet.size()>0) {
+      Range previousRange = headSet.last();
+      LOG.debug("previousRange "+previousRange);
+      if(startIndex<previousRange.getEndIndex()) {
+        //previousRange overlaps this range
+        //remove the previousRange
+        if(ranges.remove(previousRange)) {
+          indicesCount-=previousRange.getLength();
+        }
+        //expand this range
+        startIndex = previousRange.getStartIndex();
+        endIndex = endIndex>=previousRange.getEndIndex() ?
+                          endIndex : previousRange.getEndIndex();
+      }
+    }
+    
+    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+    while(tailSetIt.hasNext()) {
+      Range nextRange = tailSetIt.next();
+      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
+          "  endIndex:"+endIndex);
+      if(endIndex>=nextRange.getStartIndex()) {
+        //nextRange overlaps this range
+        //remove the nextRange
+        tailSetIt.remove();
+        indicesCount-=nextRange.getLength();
+        if(endIndex<nextRange.getEndIndex()) {
+          //expand this range
+          endIndex = nextRange.getEndIndex();
+          break;
+        }
+      } else {
+        break;
+      }
+    }
+    add(startIndex,endIndex);
+  }
+  
+  /**
+   * Remove the range indices. If this range is  
+   * found in existing ranges, the existing ranges 
+   * are shrunk.
+   * If range is of 0 length, doesn't do anything.
+   * @param range Range to be removed.
+   */
+  public synchronized void remove(Range range) {
+    if(range.isEmpty()) {
+      return;
+    }
+    long startIndex = range.getStartIndex();
+    long endIndex = range.getEndIndex();
+    //make sure that there are no overlapping ranges
+    SortedSet<Range> headSet = ranges.headSet(range);
+    if(headSet.size()>0) {
+      Range previousRange = headSet.last();
+      LOG.debug("previousRange "+previousRange);
+      if(startIndex<previousRange.getEndIndex()) {
+        //previousRange overlaps this range
+        //narrow down the previousRange
+        if(ranges.remove(previousRange)) {
+          indicesCount-=previousRange.getLength();
+          LOG.debug("removed previousRange "+previousRange);
+        }
+        add(previousRange.getStartIndex(), startIndex);
+        if(endIndex<=previousRange.getEndIndex()) {
+          add(endIndex, previousRange.getEndIndex());
+        }
+      }
+    }
+    
+    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+    while(tailSetIt.hasNext()) {
+      Range nextRange = tailSetIt.next();
+      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
+          "  endIndex:"+endIndex);
+      if(endIndex>nextRange.getStartIndex()) {
+        //nextRange overlaps this range
+        //narrow down the nextRange
+        tailSetIt.remove();
+        indicesCount-=nextRange.getLength();
+        if(endIndex<nextRange.getEndIndex()) {
+          add(endIndex, nextRange.getEndIndex());
+          break;
+        }
+      } else {
+        break;
+      }
+    }
+  }
+  
+  private void add(long start, long end) {
+    if(end>start) {
+      Range recRange = new Range(start, end-start);
+      ranges.add(recRange);
+      indicesCount+=recRange.getLength();
+      LOG.debug("added "+recRange);
+    }
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    ranges = new TreeSet<Range>();
+    int size = in.readInt();
+    for(int i=0;i<size;i++) {
+      Range range = new Range();
+      range.readFields(in);
+      ranges.add(range);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(ranges.size());
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      range.write(out);
+    }
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      sb.append(range.toString()+"\n");
+    }
+    return sb.toString();
+  }
+  
+  /**
+   * Index Range. Comprises of start index and length.
+   * A Range can be of 0 length also. The Range stores indices 
+   * of type long.
+   */
+  static class Range implements Comparable<Range>, Writable{
+    private long startIndex;
+    private long length;
+        
+    public Range(long startIndex, long length) {
+      if(length<0) {
+        throw new RuntimeException("length can't be negative");
+      }
+      this.startIndex = startIndex;
+      this.length = length;
+    }
+    
+    public Range() {
+      this(0,0);
+    }
+    
+    /**
+     * Get the start index. Start index in inclusive.
+     * @return startIndex. 
+     */
+    public long getStartIndex() {
+      return startIndex;
+    }
+    
+    /**
+     * Get the end index. End index is exclusive.
+     * @return endIndex.
+     */
+    public long getEndIndex() {
+      return startIndex + length;
+    }
+    
+   /**
+    * Get Length.
+    * @return length
+    */
+    public long getLength() {
+      return length;
+    }
+    
+    /**
+     * Range is empty if its length is zero.
+     * @return <code>true</code> if empty
+     *         <code>false</code> otherwise.
+     */
+    public boolean isEmpty() {
+      return length==0;
+    }
+    
+    public boolean equals(Object o) {
+      if(o!=null && o instanceof Range) {
+        Range range = (Range)o;
+        return startIndex==range.startIndex &&
+        length==range.length;
+      }
+      return false;
+    }
+    
+    public int hashCode() {
+      return Long.valueOf(startIndex).hashCode() +
+          Long.valueOf(length).hashCode();
+    }
+    
+    public int compareTo(Range o) {
+      if(this.equals(o)) {
+        return 0;
+      }
+      return (this.startIndex > o.startIndex) ? 1:-1;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      startIndex = in.readLong();
+      length = in.readLong();
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(startIndex);
+      out.writeLong(length);
+    }
+    
+    public String toString() {
+      return startIndex +":" + length;
+    }    
+  }
+  
+  /**
+   * Index Iterator which skips the stored ranges.
+   */
+  private class SkipRangeIterator implements Iterator<Long> {
+    Iterator<Range> rangeIterator = ranges.iterator();
+    Range range = new Range();
+    long currentIndex = -1;
+    
+    /**
+     * Returns true till the index reaches Long.MAX_VALUE.
+     * @return <code>true</code> next index exists.
+     *         <code>false</code> otherwise.
+     */
+    public boolean hasNext() {
+      return currentIndex<Long.MAX_VALUE;
+    }
+    
+    /**
+     * Get the next available index. The index starts from 0.
+     * @return next index
+     */
+    public synchronized Long next() {
+      currentIndex++;
+      LOG.debug("currentIndex "+currentIndex +"   "+range);
+      skipIfInRange();
+      while(currentIndex>=range.getEndIndex() && rangeIterator.hasNext()) {
+        range = rangeIterator.next();
+        skipIfInRange();
+      }
+      return currentIndex;
+    }
+    
+    private void skipIfInRange() {
+      if(currentIndex>=range.getStartIndex() && 
+          currentIndex<range.getEndIndex()) {
+        //need to skip the range
+        LOG.warn("Skipping index " + currentIndex +"-" + range.getEndIndex());
+        currentIndex = range.getEndIndex();
+        
+      }
+    }
+    
+    /**
+     * Remove is not supported. Doesn't apply.
+     */
+    public void remove() {
+      throw new UnsupportedOperationException("remove not supported.");
+    }
+    
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Aug 11 05:03:37 2008
@@ -64,13 +64,15 @@
   protected static enum Counter { 
     MAP_INPUT_RECORDS, 
     MAP_OUTPUT_RECORDS,
+    MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS
+    REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_RECORDS
   }
   
   /**
@@ -109,6 +111,15 @@
   TaskStatus taskStatus; 										      // current status of the task
   private Path taskOutputPath;                    // task-specific output dir
   
+  //failed ranges from previous attempts
+  private SortedRanges failedRanges = new SortedRanges();
+  private boolean skipping = false;
+  
+  //currently processing record start index
+  private volatile long currentRecStartIndex; 
+  private Iterator<Long> currentRecIndexIterator = 
+    failedRanges.skipRangeIterator();
+  
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
@@ -175,6 +186,37 @@
   protected synchronized void setPhase(TaskStatus.Phase phase){
     this.taskStatus.setPhase(phase); 
   }
+  
+  /**
+   * Get failed ranges.
+   * @return recordsToSkip
+   */
+  public SortedRanges getFailedRanges() {
+    return failedRanges;
+  }
+
+  /**
+   * Set failed ranges.
+   * @param recordsToSkip
+   */
+  public void setFailedRanges(SortedRanges failedRanges) {
+    this.failedRanges = failedRanges;
+  }
+
+  /**
+   * Is Task in skipping mode.
+   */
+  public boolean isSkipping() {
+    return skipping;
+  }
+
+  /**
+   * Sets whether to run Task in skipping mode.
+   * @param skipping
+   */
+  public void setSkipping(boolean skipping) {
+    this.skipping = skipping;
+  }
 
   ////////////////////////////////////////////
   // Writable methods
@@ -190,6 +232,8 @@
       Text.writeString(out, "");
     }
     taskStatus.write(out);
+    failedRanges.write(out);
+    out.writeBoolean(skipping);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
@@ -203,6 +247,10 @@
     }
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
+    failedRanges.readFields(in);
+    currentRecIndexIterator = failedRanges.skipRangeIterator();
+    currentRecStartIndex = currentRecIndexIterator.next();
+    skipping = in.readBoolean();
   }
 
   @Override
@@ -370,6 +418,17 @@
           if (counters != null) {
             counters.incrCounter(group, counter, amount);
           }
+          if(skipping && Counters.Application.GROUP.equals(group) && (
+              Counters.Application.MAP_PROCESSED_RECORDS.equals(counter) ||
+              Counters.Application.REDUCE_PROCESSED_RECORDS.equals(counter))) {
+            //if application reports the processed records, move the 
+            //currentRecStartIndex to the next.
+            //currentRecStartIndex is the start index which has not yet been 
+            //finished and is still in task's stomach.
+            for(int i=0;i<amount;i++) {
+              currentRecStartIndex = currentRecIndexIterator.next();
+            }
+          }
           setProgressFlag();
         }
         public InputSplit getInputSplit() throws UnsupportedOperationException {
@@ -377,6 +436,25 @@
         }
       };
   }
+  
+  /**
+   *  Reports the next executing record range to TaskTracker.
+   *  
+   * @param umbilical
+   * @param nextRecIndex the record index which would be fed next.
+   * @throws IOException
+   */
+  protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical, 
+      long nextRecIndex) throws IOException{
+    //currentRecStartIndex is the start index which has not yet been finished 
+    //and is still in task's stomach.
+    long len = nextRecIndex - currentRecStartIndex +1;
+    SortedRanges.Range range = 
+      new SortedRanges.Range(currentRecStartIndex, len);
+    taskStatus.setNextRecordRange(range);
+    LOG.debug("sending reportNextRecordRange " + range);
+    umbilical.reportNextRecordRange(taskId, range);
+  }
 
   public void setProgress(float progress) {
     taskProgress.set(progress);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Mon Aug 11 05:03:37 2008
@@ -77,6 +77,8 @@
   private int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
+  private volatile SortedRanges failedRanges = new SortedRanges();
+  private volatile boolean skipping = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -485,6 +487,12 @@
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
       machinesWhereFailed.add(trackerHostName);
+      LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+      failedRanges.add(status.getNextRecordRange());
+      if(SkipBadRecords.getEnabled(conf) && 
+          numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
+        skipping = true;
+      }
     } else {
       numKilledTasks++;
     }
@@ -677,7 +685,7 @@
     // in more depth eventually...
     //
       
-    if (activeTasks.size() <= MAX_TASK_EXECS &&
+    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
         (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
@@ -709,12 +717,16 @@
     }
 
     if (isMapTask()) {
+      LOG.debug("attemdpt "+  numTaskFailures   +
+          " sending skippedRecords "+failedRanges.getIndicesCount());
       t = new MapTask(jobFile, taskid, partition, 
           rawSplit.getClassName(), rawSplit.getBytes());
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     t.setConf(conf);
+    t.setFailedRanges(failedRanges);
+    t.setSkipping(skipping);
     tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Mon Aug 11 05:03:37 2008
@@ -56,6 +56,7 @@
   private Phase phase = Phase.STARTING; 
   private Counters counters;
   private boolean includeCounters;
+  private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
 
   public TaskStatus() {}
 
@@ -89,6 +90,23 @@
   }
   public String getStateString() { return stateString; }
   public void setStateString(String stateString) { this.stateString = stateString; }
+  
+  /**
+   * Get the next record range which is going to be processed by Task.
+   * @return nextRecordRange
+   */
+  public SortedRanges.Range getNextRecordRange() {
+    return nextRecordRange;
+  }
+
+  /**
+   * Set the next record range which is going to be processed by Task.
+   * @param nextRecordRange
+   */
+  public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
+    this.nextRecordRange = nextRecordRange;
+  }
+  
   /**
    * Get task finish time. if shuffleFinishTime and sortFinishTime 
    * are not set before, these are set to finishTime. It takes care of 
@@ -247,6 +265,7 @@
     this.progress = status.getProgress();
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
+    this.nextRecordRange = status.getNextRecordRange();
 
     setDiagnosticInfo(status.getDiagnosticInfo());
     
@@ -297,6 +316,7 @@
     if (includeCounters) {
       counters.write(out);
     }
+    nextRecordRange.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -313,6 +333,7 @@
     if (includeCounters) {
       counters.readFields(in);
     }
+    nextRecordRange.readFields(in);
   }
   
   //////////////////////////////////////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 11 05:03:37 2008
@@ -1574,6 +1574,10 @@
     public synchronized void reportDiagnosticInfo(String info) {
       this.diagnosticInfo.append(info);
     }
+    
+    public synchronized void reportNextRecordRange(SortedRanges.Range range) {
+      this.taskStatus.setNextRecordRange(range);
+    }
 
     /**
      * The task is reporting that it's done running
@@ -1977,6 +1981,17 @@
       LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
     }
   }
+  
+  public synchronized void reportNextRecordRange(TaskAttemptID taskid, 
+      SortedRanges.Range range) throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      tip.reportNextRecordRange(range);
+    } else {
+      LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " +
+      		"Ignored.");
+    }
+  }
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Mon Aug 11 05:03:37 2008
@@ -43,9 +43,11 @@
    * Version 8 changes {job|tip|task}id's to use their corresponding 
    * objects rather than strings.
    * Version 9 changes the counter representation for HADOOP-1915
+   * Version 10 changed the TaskStatus format and added reportNextRecordRange
+   *            for HADOOP-153
    * */
 
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -68,6 +70,15 @@
    *  @param trace the text to report
    */
   void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+  
+  /**
+   * Report the record range which is going to process next by the Task.
+   * @param taskid the id of the task involved
+   * @param range the range of record sequence nos
+   * @throws IOException
+   */
+  void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
+    throws IOException;
 
   /** Periodically called by child to check if parent is still alive. 
    * @return True if the task is known

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Mon Aug 11 05:03:37 2008
@@ -6,10 +6,12 @@
 MAP_INPUT_BYTES.name=          Map input bytes
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
+MAP_SKIPPED_RECORDS.name=      Map skipped records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records
 REDUCE_INPUT_GROUPS.name=      Reduce input groups
 REDUCE_INPUT_RECORDS.name=     Reduce input records
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
+REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Aug 11 05:03:37 2008
@@ -19,12 +19,14 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,6 +60,7 @@
   private ExecutorService executorService;
   private volatile IOException ioException;
   private volatile RuntimeException runtimeException;
+  private boolean incrProcCount;
 
   @SuppressWarnings("unchecked")
   public void configure(JobConf jobConf) {
@@ -69,6 +72,8 @@
     }
 
     this.job = jobConf;
+    this.incrProcCount = job.getBoolean("mapred.skip.on", false) && 
+      SkipBadRecords.getAutoIncrMapperProcCount(job);
     this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
         jobConf);
 
@@ -222,6 +227,10 @@
       try {
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+        if(incrProcCount) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+        }
       } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance
         // variable of the MultithreadedMapRunner from where it will be

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Mon Aug 11 05:03:37 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 
 /**
  * An adaptor to run a C++ mapper.
@@ -42,6 +43,9 @@
    */
   public void configure(JobConf job) {
     this.job = job;
+    //disable the auto increment of the counter. For pipes, no of processed 
+    //records could be different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrMapperProcCount(job, false);
   }
 
   /**
@@ -65,6 +69,7 @@
     boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
     downlink.runMap(reporter.getInputSplit(), 
                     job.getNumReduceTasks(), isJavaInput);
+    boolean skipping = job.getBoolean("mapred.skip.on", false);
     try {
       if (isJavaInput) {
         // allocate key & value instances that are re-used for all entries
@@ -76,6 +81,11 @@
         while (input.next(key, value)) {
           // map pair to output
           downlink.mapItem(key, value);
+          if(skipping) {
+            //flush the streams on every record input if running in skip mode
+            //so that we don't buffer other records surrounding a bad record.
+            downlink.flush();
+          }
         }
         downlink.endOfInput();
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java Mon Aug 11 05:03:37 2008
@@ -26,6 +26,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -41,9 +42,14 @@
   private Application<K2, V2, K3, V3> application = null;
   private DownwardProtocol<K2, V2> downlink = null;
   private boolean isOk = true;
+  private boolean skipping = false;
 
   public void configure(JobConf job) {
     this.job = job;
+    //disable the auto increment of the counter. For pipes, no of processed 
+    //records could be different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrReducerProcCount(job, false);
+    skipping = job.getBoolean("mapred.skip.on", false);
   }
 
   /**
@@ -59,6 +65,11 @@
     while (values.hasNext()) {
       downlink.reduceValue(values.next());
     }
+    if(skipping) {
+      //flush the streams on every record input if running in skip mode
+      //so that we don't buffer other records surrounding a bad record.
+      downlink.flush();
+    }
     isOk = true;
   }
 

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class TestBadRecords extends ClusterMapReduceTestCase {
+  
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestBadRecords");
+  
+  private static final List<String> MAPPER_BAD_RECORDS = 
+    Arrays.asList("hello01","hello04","hello05");
+  
+  private static final List<String> REDUCER_BAD_RECORDS = 
+    Arrays.asList("hello08","hello10");
+  
+  private List<String> input;
+  
+  public TestBadRecords() {
+    input = new ArrayList<String>();
+    for(int i=1;i<=10;i++) {
+      String str = ""+i;
+      int zerosToPrepend = 2 - str.length();
+      for(int j=0;j<zerosToPrepend;j++){
+        str = "0"+str;
+      }
+      input.add("hello"+str);
+    }
+  }
+  
+  private void runMapReduce(JobConf conf, 
+      List<String> mapperBadRecords, List<String> redBadRecords) 
+        throws Exception {
+    createInput();
+    conf.setJobName("mr");
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setInt("mapred.task.timeout", 30*1000);
+    
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
+        mapperBadRecords.size());
+    conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+
+        1+redBadRecords.size());
+    
+    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    RunningJob runningJob = JobClient.runJob(conf);
+    validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
+  }
+  
+  
+  private void createInput() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), 
+        "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    for(String inp : input) {
+      wr.write(inp+"\n");
+    }wr.close();
+  }
+  
+  private void validateOutput(JobConf conf, RunningJob runningJob, 
+      List<String> mapperBadRecords, List<String> redBadRecords) 
+    throws Exception{
+    LOG.info(runningJob.getCounters().toString());
+    assertTrue(runningJob.isSuccessful());
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(getOutputDir(),
+        new OutputLogFilter()));
+    
+    List<String> mapperOutput=getProcessed(input, mapperBadRecords);
+    LOG.debug("mapperOutput " + mapperOutput.size());
+    List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
+    LOG.debug("reducerOutput " + reducerOutput.size());
+    
+   if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String key = tokeniz.nextToken();
+        String value = tokeniz.nextToken();
+        LOG.debug("Output: key:"+key + "  value:"+value);
+        assertTrue(value.contains("hello"));
+        
+        
+        assertTrue(reducerOutput.contains(value));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(reducerOutput.size(), counter);
+    }
+  }
+  
+  private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
+    List<String> processed = new ArrayList<String>();
+    for(String input : inputs) {
+      if(!badRecs.contains(input)) {
+        processed.add(input);
+      }
+    }
+    return processed;
+  }
+  
+  public void testBadMapRed() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(BadMapper.class);
+    conf.setReducerClass(BadReducer.class);
+    runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
+  }
+  
+    
+  static class BadMapper extends MapReduceBase implements 
+    Mapper<LongWritable, Text, LongWritable, Text> {
+    
+    public void map(LongWritable key, Text val,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      String str = val.toString();
+      LOG.debug("MAP key:" +key +"  value:" + str);
+      if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        System.exit(-1);
+      }
+      else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        throw new RuntimeException("Bad record "+str);
+      }
+      else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
+        try {
+          LOG.warn("MAP Encountered BAD record");
+          Thread.sleep(15*60*1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      output.collect(key, val);
+    }
+  }
+  
+  static class BadReducer extends MapReduceBase implements 
+    Reducer<LongWritable, Text, LongWritable, Text> {
+    
+    public void reduce(LongWritable key, Iterator<Text> values,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      while(values.hasNext()) {
+        Text value = values.next();
+        LOG.debug("REDUCE key:" +key +"  value:" + value);
+        if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
+          LOG.warn("REDUCE Encountered BAD record");
+          System.exit(-1);
+        }
+        else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
+          try {
+            LOG.warn("REDUCE Encountered BAD record");
+            Thread.sleep(15*60*1000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        output.collect(key, value);
+      }
+      
+    }
+  }
+  
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+
+public class TestSortedRanges extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestSortedRanges");
+  
+  public void testAdd() {
+    SortedRanges sr = new SortedRanges();
+    sr.add(new Range(2,9));
+    assertEquals(9, sr.getIndicesCount());
+    
+    sr.add(new SortedRanges.Range(3,5));
+    assertEquals(9, sr.getIndicesCount());
+    
+    sr.add(new SortedRanges.Range(7,1));
+    assertEquals(9, sr.getIndicesCount());
+    
+    sr.add(new Range(1,12));
+    assertEquals(12, sr.getIndicesCount());
+    
+    sr.add(new Range(7,9));
+    assertEquals(15, sr.getIndicesCount());
+    
+    sr.add(new Range(31,10));
+    sr.add(new Range(51,10));
+    sr.add(new Range(66,10));
+    assertEquals(45, sr.getIndicesCount());
+    
+    sr.add(new Range(21,50));
+    assertEquals(70, sr.getIndicesCount());
+    
+    LOG.debug(sr);
+    
+    Iterator<Long> it = sr.skipRangeIterator();
+    int i = 0;
+    assertEquals(i, it.next().longValue());
+    for(i=16;i<21;i++) {
+      assertEquals(i, it.next().longValue());
+    }
+    assertEquals(76, it.next().longValue());
+    assertEquals(77, it.next().longValue());
+    
+  }
+  
+  public void testRemove() {
+    SortedRanges sr = new SortedRanges();
+    sr.add(new Range(2,19));
+    assertEquals(19, sr.getIndicesCount());
+    
+    sr.remove(new SortedRanges.Range(15,8));
+    assertEquals(13, sr.getIndicesCount());
+    
+    sr.remove(new SortedRanges.Range(6,5));
+    assertEquals(8, sr.getIndicesCount());
+    
+    sr.remove(new SortedRanges.Range(8,4));
+    assertEquals(7, sr.getIndicesCount());
+    
+    sr.add(new Range(18,5));
+    assertEquals(12, sr.getIndicesCount());
+    
+    sr.add(new Range(25,1));
+    assertEquals(13, sr.getIndicesCount());
+    
+    sr.remove(new SortedRanges.Range(7,24));
+    assertEquals(4, sr.getIndicesCount());
+    
+    sr.remove(new SortedRanges.Range(5,1));
+    assertEquals(3, sr.getIndicesCount());
+    
+    LOG.debug(sr);
+  }
+
+}