You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/04 07:34:39 UTC

svn commit: r771181 - in /hadoop/core/trunk: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/

Author: ddas
Date: Mon May  4 05:34:36 2009
New Revision: 771181

URL: http://svn.apache.org/viewvc?rev=771181&view=rev
Log:
HADOOP-5266. Adds the capability to do mark/reset of the reduce values iterator in the Context object API. Contributed by Jothi Padmanabhan.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/mapred-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May  4 05:34:36 2009
@@ -104,6 +104,9 @@
     HADOOP-5752. Add a new hdfs image processor, Delimited, to oiv. (Jakob
     Homan via szetszwo)
 
+    HADOOP-5266. Adds the capability to do mark/reset of the reduce values 
+    iterator in the Context object API. (Jothi Padmanabhan via ddas)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Mon May  4 05:34:36 2009
@@ -524,6 +524,14 @@
 </property>
 
 <property>
+  <name>mapred.job.reduce.markreset.buffer.percent</name>
+  <value>0.0</value>
+  <description>The percentage of memory -relative to the maximum heap size- to
+  be used for caching values when using the mark-reset functionality.
+  </description>
+</property>
+
+<property>
   <name>mapred.map.tasks.speculative.execution</name>
   <value>true</value>
   <description>If true, then multiple instances of some map tasks 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java Mon May  4 05:34:36 2009
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.IFile.InMemoryReader;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * <code>BackupStore</code> is an utility class that is used to support
+ * the mark-reset functionality of values iterator
+ *
+ * <p>It has two caches - a memory cache and a file cache where values are
+ * stored as they are iterated, after a mark. On reset, values are retrieved
+ * from these caches. Framework moves from the memory cache to the 
+ * file cache when the memory cache becomes full.
+ * 
+ */
+public class BackupStore<K,V> {
+
+  private static final Log LOG = LogFactory.getLog(BackupStore.class.getName());
+  private static final int MAX_VINT_SIZE = 9;
+  private static final int EOF_MARKER_SIZE = 2 * MAX_VINT_SIZE;
+  private final TaskAttemptID tid;
+ 
+  private MemoryCache memCache;
+  private FileCache fileCache;
+
+  List<Segment<K,V>> segmentList = new LinkedList<Segment<K,V>>();
+  private int readSegmentIndex = 0;
+  private int firstSegmentOffset = 0;
+
+  private int currentKVOffset = 0;
+  private int nextKVOffset = -1;
+
+  private DataInputBuffer currentKey = null;
+  private DataInputBuffer currentValue = new DataInputBuffer();
+  private DataInputBuffer currentDiskValue = new DataInputBuffer();
+ 
+  private boolean hasMore = false;
+  private boolean inReset = false;
+  private boolean clearMarkFlag = false;
+  private boolean lastSegmentEOF = false;
+
+  public BackupStore(Configuration conf, TaskAttemptID taskid)
+  throws IOException {
+    
+    final float bufferPercent =
+      conf.getFloat("mapred.job.reduce.markreset.buffer.percent", 0f);
+
+    if (bufferPercent > 1.0 || bufferPercent < 0.0) {
+      throw new IOException("mapred.job.reduce.markreset.buffer.percent" +
+          bufferPercent);
+    }
+
+    int maxSize = (int)Math.min(
+        Runtime.getRuntime().maxMemory() * bufferPercent, Integer.MAX_VALUE);
+
+    // Support an absolute size also.
+    int tmp = conf.getInt("mapred.job.reduce.markreset.buffer.size", 0);
+    if (tmp >  0) {
+      maxSize = tmp;
+    }
+
+    memCache = new MemoryCache(maxSize);
+    fileCache = new FileCache(conf);
+    tid = taskid;
+    
+    LOG.info("Created a new BackupStore with a memory of " + maxSize);
+
+  }
+
+  /**
+   * Write the given K,V to the cache. 
+   * Write to memcache if space is available, else write to the filecache
+   * @param key
+   * @param value
+   * @throws IOException
+   */
+  public void write(DataInputBuffer key, DataInputBuffer value)
+  throws IOException {
+
+    assert (key != null && value != null);
+
+    if (fileCache.isActive()) {
+      fileCache.write(key, value);
+      return;
+    }
+
+    if (memCache.reserveSpace(key, value)) {
+      memCache.write(key, value);
+    } else {
+      fileCache.activate();
+      fileCache.write(key, value);
+    }
+  }
+
+  public void mark() throws IOException {
+
+    // We read one KV pair in advance in hasNext. 
+    // If hasNext has read the next KV pair from a new segment, but the
+    // user has not called next() for that KV, then reset the readSegmentIndex
+    // to the previous segment
+
+    if (nextKVOffset == 0) {
+      assert (readSegmentIndex != 0);
+      assert (currentKVOffset != 0);
+      readSegmentIndex --;
+    }
+
+    // just drop segments before the current active segment
+
+    int i = 0;
+    Iterator<Segment<K,V>> itr = segmentList.iterator();
+    while (itr.hasNext()) {
+      Segment<K,V> s = itr.next();
+      if (i == readSegmentIndex) {
+        break;
+      }
+      s.close();
+      itr.remove();
+      i++;
+      LOG.debug("Dropping a segment");
+    }
+
+    // FirstSegmentOffset is the offset in the current segment from where we
+    // need to start reading on the next reset
+
+    firstSegmentOffset = currentKVOffset;
+    readSegmentIndex = 0;
+
+    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
+  }
+
+  public void reset() throws IOException {
+
+    // Create a new segment for the previously written records only if we
+    // are not already in the reset mode
+    
+    if (!inReset) {
+      if (fileCache.isActive) {
+        fileCache.createInDiskSegment();
+      } else {
+        memCache.createInMemorySegment();
+      }
+    } 
+
+    inReset = true;
+    
+    // Reset the segments to the correct position from where the next read
+    // should begin. 
+    for (int i = 0; i < segmentList.size(); i++) {
+      Segment<K,V> s = segmentList.get(i);
+      if (s.inMemory()) {
+        int offset = (i == 0) ? firstSegmentOffset : 0;
+        s.getReader().reset(offset);
+      } else {
+        s.closeReader();
+        if (i == 0) {
+          s.reinitReader(firstSegmentOffset);
+          s.getReader().disableChecksumValidation();
+        }
+      }
+    }
+    
+    currentKVOffset = firstSegmentOffset;
+    nextKVOffset = -1;
+    readSegmentIndex = 0;
+    hasMore = false;
+    lastSegmentEOF = false;
+
+    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
+        " Segment List Size is " + segmentList.size());
+  }
+
+  public boolean hasNext() throws IOException {
+    
+    if (lastSegmentEOF) {
+      return false;
+    }
+    
+    // We read the next KV from the cache to decide if there is any left.
+    // Since hasNext can be called several times before the actual call to 
+    // next(), we use hasMore to avoid extra reads. hasMore is set to false
+    // when the user actually consumes this record in next()
+
+    if (hasMore) {
+      return true;
+    }
+
+    Segment<K,V> seg = segmentList.get(readSegmentIndex);
+    // Mark the current position. This would be set to currentKVOffset
+    // when the user consumes this record in next(). 
+    nextKVOffset = (int) seg.getActualPosition();
+    if (seg.nextRawKey()) {
+      currentKey = seg.getKey();
+      seg.getValue(currentValue);
+      hasMore = true;
+      return true;
+    } else {
+      if (!seg.inMemory()) {
+        seg.closeReader();
+      }
+    }
+
+    // If this is the last segment, mark the lastSegmentEOF flag and return
+    if (readSegmentIndex == segmentList.size() - 1) {
+      nextKVOffset = -1;
+      lastSegmentEOF = true;
+      return false;
+    }
+
+    nextKVOffset = 0;
+    readSegmentIndex ++;
+
+    Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
+    
+    // We possibly are moving from a memory segment to a disk segment.
+    // Reset so that we do not corrupt the in-memory segment buffer.
+    // See HADOOP-5494
+    
+    if (!nextSegment.inMemory()) {
+      currentValue.reset(currentDiskValue.getData(), 
+          currentDiskValue.getLength());
+      nextSegment.init(null);
+    }
+ 
+    if (nextSegment.nextRawKey()) {
+      currentKey = nextSegment.getKey();
+      nextSegment.getValue(currentValue);
+      hasMore = true;
+      return true;
+    } else {
+      throw new IOException("New segment did not have even one K/V");
+    }
+  }
+
+  public void next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException("iterate past last value");
+    }
+    // Reset hasMore. See comment in hasNext()
+    hasMore = false;
+    currentKVOffset = nextKVOffset;
+    nextKVOffset = -1;
+  }
+
+  public DataInputBuffer nextValue() {
+    return  currentValue;
+  }
+
+  public DataInputBuffer nextKey() {
+    return  currentKey;
+  }
+
+  public void reinitialize() throws IOException {
+    if (segmentList.size() != 0) {
+      clearSegmentList();
+    }
+    memCache.reinitialize(true);
+    fileCache.reinitialize();
+    readSegmentIndex = firstSegmentOffset = 0;
+    currentKVOffset = 0;
+    nextKVOffset = -1;
+    hasMore = inReset = clearMarkFlag = false;
+  }
+
+  /**
+   * This function is called the ValuesIterator when a mark is called
+   * outside of a reset zone.  
+   */
+  public void exitResetMode() throws IOException { 
+    inReset = false;
+    if (clearMarkFlag ) {
+      // If a flag was set to clear mark, do the reinit now.
+      // See clearMark()
+      reinitialize();
+      return;
+    }
+    if (!fileCache.isActive) {
+      memCache.reinitialize(false);
+    }
+  }
+
+  /** For writing the first key and value bytes directly from the
+   *  value iterators, pass the current underlying output stream
+   *  @param length The length of the impending write
+   */
+  public DataOutputStream getOutputStream(int length) throws IOException {
+    if (memCache.reserveSpace(length)) {
+      return memCache.dataOut;
+    } else {
+      fileCache.activate();
+      return fileCache.writer.getOutputStream();
+    }
+  }
+
+  /** This method is called by the valueIterators after writing the first
+   *  key and value bytes to the BackupStore
+   * @param length 
+   */
+  public void updateCounters(int length) {
+    if (fileCache.isActive) {
+      fileCache.writer.updateCountersForExternalAppend(length);
+    } else {
+      memCache.usedSize += length;
+    }
+  }
+
+  public void clearMark() throws IOException {
+    if (inReset) {
+      // If we are in the reset mode, we just mark a flag and come out
+      // The actual re initialization would be done when we exit the reset
+      // mode
+      clearMarkFlag = true;
+    } else {
+      reinitialize();
+    }
+  }
+  
+  private void clearSegmentList() throws IOException {
+    for (Segment<K,V> segment: segmentList) {
+      segment.close();
+    }
+    segmentList.clear();
+  }
+
+  class MemoryCache {
+    private DataOutputBuffer dataOut;
+    private int blockSize;
+    private int usedSize;
+    private final BackupRamManager ramManager;
+
+    // Memory cache is made up of blocks.
+    private int defaultBlockSize = 1024 * 1024;
+
+    public MemoryCache(int maxSize) {
+      ramManager = new BackupRamManager(maxSize);
+      if (maxSize < defaultBlockSize) {
+        defaultBlockSize = maxSize;
+      }
+    }
+
+    /**
+     * Re-initialize the memory cache.
+     * 
+     * @param clearAll If true, re-initialize the ramManager also.
+     */
+    void reinitialize(boolean clearAll) {
+      if (clearAll) {
+        ramManager.reinitialize();
+      }
+      int allocatedSize = createNewMemoryBlock(defaultBlockSize, 
+          defaultBlockSize);
+      assert(allocatedSize == defaultBlockSize || allocatedSize == 0);
+      LOG.debug("Created a new mem block of " + allocatedSize);
+    }
+
+    private int createNewMemoryBlock(int requestedSize, int minSize) {
+      int allocatedSize = ramManager.reserve(requestedSize, minSize);
+      usedSize = 0;
+      if (allocatedSize == 0) {
+        dataOut = null;
+        blockSize = 0;
+      } else {
+        dataOut = new DataOutputBuffer(allocatedSize);
+        blockSize = allocatedSize;
+      }
+      return allocatedSize;
+    }
+
+    /**
+     * This method determines if there is enough space left in the 
+     * memory cache to write to the requested length + space for
+     * subsequent EOF makers.
+     * @param length
+     * @return true if enough space is available
+     */
+    boolean reserveSpace(int length) throws IOException {
+      int availableSize = blockSize - usedSize;
+      if (availableSize >= length + EOF_MARKER_SIZE) {
+        return true;
+      }
+      // Not enough available. Close this block 
+      assert (!inReset); 
+
+      createInMemorySegment();
+      
+      // Create a new block
+      int tmp = Math.max(length + EOF_MARKER_SIZE, defaultBlockSize);
+      availableSize = createNewMemoryBlock(tmp, 
+          (length + EOF_MARKER_SIZE));
+      
+      return (availableSize == 0) ? false : true;
+    }
+
+    boolean reserveSpace(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      int valueLength = value.getLength() - value.getPosition();
+
+      int requestedSize = keyLength + valueLength + 
+        WritableUtils.getVIntSize(keyLength) +
+        WritableUtils.getVIntSize(valueLength);
+      return reserveSpace(requestedSize);
+    }
+    
+    /**
+     * Write the key and value to the cache in the IFile format
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    public void write(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      int valueLength = value.getLength() - value.getPosition();
+      WritableUtils.writeVInt(dataOut, keyLength);
+      WritableUtils.writeVInt(dataOut, valueLength);
+      dataOut.write(key.getData(), key.getPosition(), keyLength);
+      dataOut.write(value.getData(), value.getPosition(), valueLength);
+      usedSize += keyLength + valueLength + 
+        WritableUtils.getVIntSize(keyLength) +
+        WritableUtils.getVIntSize(valueLength);
+      LOG.debug("ID: " + segmentList.size() + " WRITE TO MEM");
+    }
+
+    /**
+     * This method creates a memory segment from the existing buffer
+     * @throws IOException
+     */
+    void createInMemorySegment () throws IOException {
+
+      // If nothing was written in this block because the record size
+      // was greater than the allocated block size, just return.
+      if (usedSize == 0) {
+        ramManager.unreserve(blockSize);
+        return;
+      }
+
+      // spaceAvailable would have ensured that there is enough space
+      // left for the EOF markers.
+      assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
+  
+      WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
+      WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
+
+      usedSize += EOF_MARKER_SIZE;
+
+      ramManager.unreserve(blockSize - usedSize);
+
+      Reader<K, V> reader = 
+        new InMemoryReader<K, V>(ramManager, 
+            (org.apache.hadoop.mapred.TaskAttemptID) tid, 
+            dataOut.getData(), 0, usedSize);
+      Segment<K, V> segment = new Segment<K, V>(reader, false);
+      segmentList.add(segment);
+      LOG.debug("Added Memory Segment to List. List Size is " + 
+          segmentList.size());
+    }
+  }
+
+  class FileCache {
+    private LocalDirAllocator lDirAlloc;
+    private final Configuration conf;
+    private final FileSystem fs;
+    private boolean isActive = false;
+
+    private Path file = null;
+    private IFile.Writer<K,V> writer = null;
+    private int spillNumber = 0;
+
+    public FileCache(Configuration conf)
+    throws IOException {
+      this.conf = conf;
+      this.fs = FileSystem.getLocal(conf);
+      this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    }
+
+    void write(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      if (writer == null) {
+        // If spillNumber is 0, we should have called activate and not
+        // come here at all
+        assert (spillNumber != 0); 
+        writer = createSpillFile();
+      }
+      writer.append(key, value);
+      LOG.debug("ID: " + segmentList.size() + " WRITE TO DISK");
+    }
+
+    void reinitialize() {
+      spillNumber = 0;
+      writer = null;
+      isActive = false;
+    }
+
+    void activate() throws IOException {
+      isActive = true;
+      writer = createSpillFile();
+    }
+
+    void createInDiskSegment() throws IOException {
+      assert (writer != null);
+      writer.close();
+      Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
+      writer = null;
+      segmentList.add(s);
+      LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
+    }
+
+    boolean isActive() { return isActive; }
+
+    private Writer<K,V> createSpillFile() throws IOException {
+      Path tmp = new Path(
+          TaskTracker.getIntermediateOutputDir(
+              tid.getJobID().toString(), tid.toString()) + 
+              "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+
+      LOG.info("Created file: " + tmp);
+
+      file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), 
+          -1, conf);
+      return new Writer<K, V>(conf, fs, file);
+    }
+  }
+
+  static class BackupRamManager implements RamManager {
+
+    private int availableSize = 0;
+    private final int maxSize;
+
+    public BackupRamManager(int size) {
+      availableSize = maxSize = size;
+    }
+
+    @Override
+    public boolean reserve(int requestedSize, InputStream in) {
+      // Not used
+      LOG.warn("Reserve(int, InputStream) not supported by BackupRamManager");
+      return false;
+    }
+
+    int reserve(int requestedSize) {
+      if (availableSize == 0) {
+        return 0;
+      }
+      int reservedSize = Math.min(requestedSize, availableSize);
+      availableSize -= reservedSize;
+      LOG.debug("Reserving: " + reservedSize + " Requested: " + requestedSize);
+      return reservedSize;
+    }
+
+    int reserve(int requestedSize, int minSize) {
+      if (availableSize < minSize) {
+        LOG.debug("No Space available. Available: " + availableSize + 
+            " MinSize: " + minSize);
+        return 0;
+      } else {
+        return reserve(requestedSize);
+      }
+    }
+
+    @Override
+    public void unreserve(int requestedSize) {
+      availableSize += requestedSize;
+      LOG.debug("Unreserving: " + requestedSize +
+          ". Available: " + availableSize);
+    }
+    
+    void reinitialize() {
+      availableSize = maxSize;
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Mon May  4 05:34:36 2009
@@ -19,6 +19,7 @@
 
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -50,7 +51,7 @@
  */
 class IFile {
 
-  private static final int EOF_MARKER = -1;
+  static final int EOF_MARKER = -1;
   
   /**
    * <code>IFile.Writer</code> to write out intermediate map-outputs. 
@@ -111,18 +112,31 @@
       
       this.keyClass = keyClass;
       this.valueClass = valueClass;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
-      this.keySerializer.open(buffer);
-      this.valueSerializer = serializationFactory.getSerializer(valueClass);
-      this.valueSerializer.open(buffer);
+
+      if (keyClass != null) {
+        SerializationFactory serializationFactory = 
+          new SerializationFactory(conf);
+        this.keySerializer = serializationFactory.getSerializer(keyClass);
+        this.keySerializer.open(buffer);
+        this.valueSerializer = serializationFactory.getSerializer(valueClass);
+        this.valueSerializer.open(buffer);
+      }
+    }
+
+    public Writer(Configuration conf, FileSystem fs, Path file) 
+    throws IOException {
+      this(conf, fs, file, null, null, null, null);
     }
 
     public void close() throws IOException {
 
-      // Close the serializers
-      keySerializer.close();
-      valueSerializer.close();
+      // When IFile writer is created by BackupStore, we do not have
+      // Key and Value classes set. So, check before closing the
+      // serializers
+      if (keyClass != null) {
+        keySerializer.close();
+        valueSerializer.close();
+      }
 
       // Write EOF_MARKER for key/value length
       WritableUtils.writeVInt(out, EOF_MARKER);
@@ -226,6 +240,17 @@
       ++numRecordsWritten;
     }
     
+    // Required for mark/reset
+    public DataOutputStream getOutputStream () {
+      return out;
+    }
+    
+    // Required for mark/reset
+    public void updateCountersForExternalAppend(long length) {
+      ++numRecordsWritten;
+      decompressedBytesWritten += length;
+    }
+    
     public long getRawLength() {
       return decompressedBytesWritten;
     }
@@ -423,6 +448,15 @@
         readRecordsCounter.increment(numRecordsRead);
       }
     }
+    
+    public void reset(int offset) {
+      return;
+    }
+
+    public void disableChecksumValidation() {
+      checksumIn.disableChecksumValidation();
+    }
+
   }    
   
   /**
@@ -432,6 +466,8 @@
     RamManager ramManager;
     TaskAttemptID taskAttemptId;
     DataInputBuffer memDataIn = new DataInputBuffer();
+    private int start;
+    private int length;
     public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
                           byte[] data, int start, int length)
                           throws IOException {
@@ -442,6 +478,15 @@
       buffer = data;
       bufferSize = (int)fileLength;
       memDataIn.reset(buffer, start, length);
+      this.start = start;
+      this.length = length;
+    }
+
+    @Override
+    public void reset(int offset) {
+      memDataIn.reset(buffer, start + offset, length);
+      bytesRead = offset;
+      eof = false;
     }
     
     @Override

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Mon May  4 05:34:36 2009
@@ -41,6 +41,8 @@
   private byte csum[] = null;
   private int checksumSize;
   
+  private boolean disableChecksumValidation = false;
+  
   /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
@@ -155,6 +157,10 @@
     sum.update(b,off,bytesRead);
 
     currentOffset += bytesRead;
+
+    if (disableChecksumValidation) {
+      return bytesRead;
+    }
     
     if (currentOffset == dataLength) {
       // The last four bytes are checksum. Strip them and verify
@@ -183,4 +189,8 @@
   public byte[] getChecksum() {
     return csum;
   }
+
+  void disableChecksumValidation() {
+    disableChecksumValidation = true;
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Mon May  4 05:34:36 2009
@@ -163,7 +163,7 @@
       this.segmentLength = reader.getLength();
     }
 
-    private void init(Counters.Counter readsCounter) throws IOException {
+    void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
         FSDataInputStream in = fs.open(file);
         in.seek(segmentOffset);
@@ -195,9 +195,15 @@
       reader.nextRawValue(value);
     }
 
+    void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+    
     void close() throws IOException {
-      reader.close();
-      
+      closeReader();
       if (!preserve && fs != null) {
         fs.delete(file, false);
       }
@@ -206,6 +212,27 @@
     public long getPosition() throws IOException {
       return reader.getPosition();
     }
+
+    // This method is used by BackupStore to extract the 
+    // absolute position after a reset
+    long getActualPosition() throws IOException {
+      return segmentOffset + reader.getPosition();
+    }
+
+    Reader<K,V> getReader() {
+      return reader;
+    }
+    
+    // This method is used by BackupStore to reinitialize the
+    // reader to start reading from a different segment offset
+    void reinitReader(int offset) throws IOException {
+      if (!inMemory()) {
+        closeReader();
+        segmentOffset = offset;
+        segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+        init(null);
+      }
+    }
   }
   
   private static class MergeQueue<K extends Object, V extends Object> 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java Mon May  4 05:34:36 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>MarkableIterator</code> is a wrapper iterator class that 
+ * implements the {@link MarkableIteratorInterface}.
+ * 
+ */
+public class MarkableIterator<VALUE> 
+  implements MarkableIteratorInterface<VALUE> {
+
+  MarkableIteratorInterface<VALUE> baseIterator;
+
+  /**
+   * Create a new iterator layered on the input iterator
+   * @param itr underlying iterator that implements MarkableIteratorInterface
+   */
+  public MarkableIterator(Iterator<VALUE> itr)  {
+    if (!(itr instanceof MarkableIteratorInterface)) {
+      throw new IllegalArgumentException("Input Iterator not markable");
+    }
+    baseIterator = (MarkableIteratorInterface<VALUE>) itr;
+  }
+
+  @Override
+  public void mark() throws IOException {
+    baseIterator.mark();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    baseIterator.reset();
+  }
+
+  @Override
+  public void clearMark() throws IOException {
+    baseIterator.clearMark();
+  }
+
+  @Override
+  public boolean hasNext() { 
+    return baseIterator.hasNext();
+  }
+
+  @Override
+  public VALUE next() {
+    return baseIterator.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Remove Not Implemented");
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java Mon May  4 05:34:36 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>MarkableIteratorInterface</code> is an interface for a iterator that 
+ * supports mark-reset functionality. 
+ *
+ * <p>Mark can be called at any point during the iteration process and a reset
+ * will go back to the last record before the call to the previous mark.
+ * 
+ */
+interface MarkableIteratorInterface<VALUE> extends Iterator<VALUE> {
+  /**
+   * Mark the current record. A subsequent call to reset will rewind
+   * the iterator to this record.
+   * @throws IOException
+   */
+  void mark() throws IOException;
+  
+  /**
+   * Reset the iterator to the last record before a call to the previous mark
+   * @throws IOException
+   */
+  void reset() throws IOException;
+  
+  /**
+   * Clear any previously set mark
+   * @throws IOException
+   */
+  void clearMark() throws IOException;
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Mon May  4 05:34:36 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -26,8 +27,11 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.util.Progressable;
 
@@ -54,7 +58,16 @@
   private DataInputBuffer buffer = new DataInputBuffer();
   private BytesWritable currentRawKey = new BytesWritable();
   private ValueIterable iterable = new ValueIterable();
-
+  private boolean isMarked = false;
+  private BackupStore<KEYIN,VALUEIN> backupStore;
+  private final SerializationFactory serializationFactory;
+  private final Class<KEYIN> keyClass;
+  private final Class<VALUEIN> valueClass;
+  private final Configuration conf;
+  private final TaskAttemptID taskid;
+  private int currentKeyLength = -1;
+  private int currentValueLength = -1;
+  
   public ReduceContext(Configuration conf, TaskAttemptID taskid,
                        RawKeyValueIterator input, 
                        Counter inputCounter,
@@ -69,12 +82,16 @@
     this.input = input;
     this.inputCounter = inputCounter;
     this.comparator = comparator;
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.serializationFactory = new SerializationFactory(conf);
     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
     this.keyDeserializer.open(buffer);
     this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
     this.valueDeserializer.open(buffer);
     hasMore = input.next();
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.conf = conf;
+    this.taskid = taskid;
   }
 
   /** Start processing next unique key. */
@@ -100,23 +117,31 @@
       return false;
     }
     firstValue = !nextKeyIsSame;
-    DataInputBuffer next = input.getKey();
-    currentRawKey.set(next.getData(), next.getPosition(), 
-                      next.getLength() - next.getPosition());
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
     buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
     key = keyDeserializer.deserialize(key);
-    next = input.getValue();
-    buffer.reset(next.getData(), next.getPosition(), next.getLength());
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
     value = valueDeserializer.deserialize(value);
+
+    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+    currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+    if (isMarked) {
+      backupStore.write(nextKey, nextVal);
+    }
+
     hasMore = input.next();
     inputCounter.increment(1);
     if (hasMore) {
-      next = input.getKey();
+      nextKey = input.getKey();
       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
-                                         currentRawKey.getLength(),
-                                         next.getData(),
-                                         next.getPosition(),
-                                         next.getLength() - next.getPosition()
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
                                          ) == 0;
     } else {
       nextKeyIsSame = false;
@@ -132,16 +157,51 @@
   public VALUEIN getCurrentValue() {
     return value;
   }
+  
 
-  protected class ValueIterator implements Iterator<VALUEIN> {
+  
+  protected class ValueIterator implements MarkableIteratorInterface<VALUEIN> {
 
+    private boolean inReset = false;
+    private boolean clearMarkFlag = false;
+    
     @Override
     public boolean hasNext() {
+      try {
+        if (inReset && backupStore.hasNext()) {
+          return true;
+        } 
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("hasNext failed", e);
+      }
       return firstValue || nextKeyIsSame;
     }
 
     @Override
     public VALUEIN next() {
+      if (inReset) {
+        try {
+          if (backupStore.hasNext()) {
+            backupStore.next();
+            DataInputBuffer next = backupStore.nextValue();
+            buffer.reset(next.getData(), next.getPosition(), next.getLength());
+            value = valueDeserializer.deserialize(value);
+            return value;
+          } else {
+            inReset = false;
+            backupStore.exitResetMode();
+            if (clearMarkFlag) {
+              clearMarkFlag = false;
+              isMarked = false;
+            }
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("next value iterator failed", e);
+        }
+      } 
+
       // if this is the first record, we don't need to advance
       if (firstValue) {
         firstValue = false;
@@ -168,7 +228,101 @@
     public void remove() {
       throw new UnsupportedOperationException("remove not implemented");
     }
-    
+
+    @Override
+    public void mark() throws IOException {
+      if (backupStore == null) {
+        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+      }
+      isMarked = true;
+      if (!inReset) {
+        backupStore.reinitialize();
+        if (currentKeyLength == -1) {
+          // The user has not called next() for this iterator yet, so
+          // there is no current record to mark and copy to backup store.
+          return;
+        }
+        assert (currentValueLength != -1);
+        int requestedSize = currentKeyLength + currentValueLength + 
+          WritableUtils.getVIntSize(currentKeyLength) +
+          WritableUtils.getVIntSize(currentValueLength);
+        DataOutputStream out = backupStore.getOutputStream(requestedSize);
+        writeFirstKeyValueBytes(out);
+        backupStore.updateCounters(requestedSize);
+      } else {
+        backupStore.mark();
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      // We reached the end of an iteration and user calls a 
+      // reset, but a clearMark was called before, just throw
+      // an exception
+      if (clearMarkFlag) {
+        clearMarkFlag = false;
+        backupStore.clearMark();
+        throw new IOException("Reset called without a previous mark");
+      }
+      
+      if (!isMarked) {
+        throw new IOException("Reset called without a previous mark");
+      }
+      inReset = true;
+      backupStore.reset();
+    }
+
+    @Override
+    public void clearMark() throws IOException {
+      if (backupStore == null) {
+        return;
+      }
+      if (inReset) {
+        clearMarkFlag = true;
+        backupStore.clearMark();
+      } else {
+        inReset = isMarked = false;
+        backupStore.reinitialize();
+      }
+    }
+	  
+    /**
+     * This method is called when the reducer moves from one key to 
+     * another.
+     * @throws IOException
+     */
+    void resetBackupStore() throws IOException {
+      if (backupStore == null) {
+        return;
+      }
+      inReset = isMarked = false;
+      backupStore.reinitialize();
+      currentKeyLength = -1;
+    }
+
+    /**
+     * This method is called to write the record that was most recently
+     * served (before a call to the mark). Since the framework reads one
+     * record in advance, to get this record, we serialize the current key
+     * and value
+     * @param out
+     * @throws IOException
+     */
+    private void writeFirstKeyValueBytes(DataOutputStream out) 
+    throws IOException {
+      assert (getCurrentKey() != null && getCurrentValue() != null);
+      WritableUtils.writeVInt(out, currentKeyLength);
+      WritableUtils.writeVInt(out, currentValueLength);
+      Serializer<KEYIN> keySerializer = 
+        serializationFactory.getSerializer(keyClass);
+      keySerializer.open(out);
+      keySerializer.serialize(getCurrentKey());
+
+      Serializer<VALUEIN> valueSerializer = 
+        serializationFactory.getSerializer(valueClass);
+      valueSerializer.open(out);
+      valueSerializer.serialize(getCurrentValue());
+    }
   }
 
   protected class ValueIterable implements Iterable<VALUEIN> {
@@ -189,4 +343,4 @@
   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
     return iterable;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Mon May  4 05:34:36 2009
@@ -168,11 +168,15 @@
    * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
    * control how the reduce task works.
    */
+  @SuppressWarnings("unchecked")
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
     while (context.nextKey()) {
       reduce(context.getCurrentKey(), context.getValues(), context);
+      // If a back up store is used, reset it
+      ((ReduceContext.ValueIterator)
+          (context.getValues().iterator())).resetBackupStore();
     }
     cleanup(context);
   }
-}
\ No newline at end of file
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java Mon May  4 05:34:36 2009
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * A JUnit test to test the Map-Reduce framework's support for the
+ * "mark-reset" functionality in Reduce Values Iterator
+ */
+public class TestValueIterReset extends TestCase {
+  private static final int NUM_MAPS = 1;
+  private static final int NUM_TESTS = 4;
+  private static final int NUM_VALUES = 40;
+
+  private static Path TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"));
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Log LOG =
+    LogFactory.getLog(TestValueIterReset.class);
+
+  public static class TestMapper 
+  extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
+
+    public void map(LongWritable key, Text value, Context context)
+    throws IOException, InterruptedException {
+
+      IntWritable outKey = new IntWritable();
+      IntWritable outValue = new IntWritable();
+
+      for (int j = 0; j < NUM_TESTS; j++) {
+        for (int i = 0; i < NUM_VALUES; i++) {
+          outKey.set(j);
+          outValue.set(i);
+          context.write(outKey, outValue);
+        }
+      }
+    }
+  }
+
+  public static class TestReducer 
+  extends Reducer< IntWritable,IntWritable,IntWritable,IntWritable> {
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values, 
+        Context context) throws IOException, InterruptedException {
+
+      int errors = 0;
+
+      MarkableIterator<IntWritable> mitr = 
+        new MarkableIterator<IntWritable>(values.iterator());
+
+      switch (key.get()) {
+      case 0:
+        errors += test0(key, mitr);
+        break;
+      case 1:
+        errors += test1(key, mitr);
+        break;
+      case 2:
+        errors += test2(key, mitr);
+        break;
+      case 3:
+        errors += test3(key, mitr);
+        break;
+      default:
+        break;
+      }
+      context.write(key, new IntWritable(errors));
+    }
+  }
+
+  /**
+   * Test the most common use case. Mark before start of the iteration and
+   * reset at the end to go over the entire list
+   * @param key
+   * @param values
+   * @return
+   * @throws IOException
+   */
+
+  private static int test0(IntWritable key,
+                           MarkableIterator<IntWritable> values)
+  throws IOException {
+
+    int errors = 0;
+    IntWritable i;
+    ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+    
+
+    LOG.info("Executing TEST:0 for Key:"+ key.toString());
+
+    values.mark();
+    LOG.info("TEST:0. Marking");
+
+    while (values.hasNext()) {
+      i = values.next();
+      expectedValues.add(i);
+      LOG.info(key + ":" + i);
+    }
+
+    values.reset();
+    LOG.info("TEST:0. Reset");
+
+    int count = 0;
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+      if (i != expectedValues.get(count)) {
+        LOG.info("TEST:0. Check:1 Expected: " + expectedValues.get(count) +
+            ", Got: " + i);
+        errors ++;
+        return errors;
+      }
+      count ++;
+    }
+
+    LOG.info("TEST:0 Done");
+    return errors;
+  }
+
+  /**
+   * Test the case where we do a mark outside of a reset. Test for both file
+   * and memory caches
+   * @param key
+   * @param values
+   * @return
+   * @throws IOException
+   */
+  private static int test1(IntWritable key, 
+                           MarkableIterator<IntWritable> values)
+  throws IOException {
+
+    IntWritable i;
+    int errors = 0;
+    int count = 0;
+    
+    ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+    ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
+
+    LOG.info("Executing TEST:1 for Key:" + key);
+
+    values.mark();
+    LOG.info("TEST:1. Marking");
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+      expectedValues.add(i);
+      if (count == 2) {
+        break;
+      }
+      count ++;
+    }
+
+    values.reset();
+    LOG.info("TEST:1. Reset");
+    count = 0;
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+
+      if (count < expectedValues.size()) {
+        if (i != expectedValues.get(count)) {
+          errors ++;
+          LOG.info("TEST:1. Check:1 Expected: " + expectedValues.get(count) +
+              ", Got: " + i);
+          return errors;
+        }
+      }
+      
+      // We have moved passed the first mark, but still in the memory cache
+      if (count == 3) {
+        values.mark();
+        LOG.info("TEST:1. Marking -- " + key + ": " + i);
+      }
+
+      if (count >= 3) {
+        expectedValues1.add(i);
+      }
+      
+      if (count == 5) {
+        break;
+      }
+      count ++;
+    }
+
+    if (count < expectedValues.size()) {
+      LOG.info(("TEST:1 Check:2. Iterator returned lesser values"));
+      errors ++;
+      return errors;
+    }
+    
+    values.reset();
+    count = 0;
+    LOG.info("TEST:1. Reset");
+    expectedValues.clear();
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+
+      if (count < expectedValues1.size()) {
+        if (i != expectedValues1.get(count)) {
+          errors ++;
+          LOG.info("TEST:1. Check:3 Expected: " + expectedValues1.get(count)
+              + ", Got: " + i);
+          return errors;
+        }
+      }
+      
+      // We have moved passed the previous mark, but now we are in the file
+      // cache
+      if (count == 25) {
+        values.mark();
+        LOG.info("TEST:1. Marking -- " + key + ":" + i);
+      }
+      
+      if (count >= 25) {
+        expectedValues.add(i);
+      }
+      count ++;
+    }
+
+    if (count < expectedValues1.size()) {
+      LOG.info(("TEST:1 Check:4. Iterator returned fewer values"));
+      errors ++;
+      return errors;
+    }
+
+    values.reset();
+    LOG.info("TEST:1. Reset");
+    count = 0;
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+
+      if (i != expectedValues.get(count)) {
+        errors ++;
+        LOG.info("TEST:1. Check:5 Expected: " + expectedValues.get(count)
+            + ", Got: " + i);
+        return errors;
+      }
+    }
+
+    LOG.info("TEST:1 Done");
+    return errors;
+  }
+
+  /**
+   * Test the case where we do a mark inside a reset. Test for both file
+   * and memory
+   * @param key
+   * @param values
+   * @return
+   * @throws IOException
+   */
+  private static int test2(IntWritable key,
+                           MarkableIterator<IntWritable> values)
+  throws IOException {
+
+    IntWritable i;
+    int errors = 0;
+    int count = 0;
+    
+    ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+    ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
+
+    LOG.info("Executing TEST:2 for Key:" + key);
+
+    values.mark();
+    LOG.info("TEST:2 Marking");
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+      expectedValues.add(i);
+      if (count == 8) {
+        break;
+      }
+      count ++;
+    }
+
+    values.reset();
+    count = 0;
+    LOG.info("TEST:2 reset");
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+      
+      if (count < expectedValues.size()) {
+        if (i != expectedValues.get(count)) {
+          errors ++;
+          LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+              + ", Got: " + i);
+          return errors;
+        }
+      }
+
+      // We have moved passed the first mark, but still reading from the
+      // memory cache
+      if (count == 3) {
+        values.mark();
+        LOG.info("TEST:2. Marking -- " + key + ":" + i);
+      }
+      
+      if (count >= 3) {
+        expectedValues1.add(i);
+      }
+      count ++;
+    }
+
+    values.reset();
+    LOG.info("TEST:2. Reset");
+    expectedValues.clear();
+    count = 0;
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+
+      if (count < expectedValues1.size()) {
+        if (i != expectedValues1.get(count)) {
+          errors ++;
+          LOG.info("TEST:2. Check:2 Expected: " + expectedValues1.get(count)
+              + ", Got: " + i);
+          return errors;
+        }
+      }
+      
+      // We have moved passed the previous mark, but now we are in the file
+      // cache
+      if (count == 20) {
+        values.mark();
+        LOG.info("TEST:2. Marking -- " + key + ":" + i);
+      }
+      
+      if (count >= 20) {
+        expectedValues.add(i);
+      }
+      count ++;
+    }
+
+    values.reset();
+    count = 0;
+    LOG.info("TEST:2. Reset");
+
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+
+      if (i != expectedValues.get(count)) {
+        errors ++;
+        LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+            + ", Got: " + i);
+        return errors;
+      }
+    }
+
+    LOG.info("TEST:2 Done");
+    return errors;
+  }
+
+  /**
+   * Test "clearMark"
+   * @param key
+   * @param values
+   * @return
+   * @throws IOException
+   */
+  private static int test3(IntWritable key,
+                              MarkableIterator<IntWritable> values)
+  throws IOException {
+
+    int errors = 0;
+    IntWritable i;
+
+    ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+
+    LOG.info("Executing TEST:3 for Key:" + key);
+
+    values.mark();
+    LOG.info("TEST:3. Marking");
+    int count = 0;
+
+    while (values.hasNext()) {
+      i = values.next();;
+      LOG.info(key + ":" + i);
+      
+      if (count == 5) {
+        LOG.info("TEST:3. Clearing Mark");
+        values.clearMark();
+      }
+
+      if (count == 8) {
+        LOG.info("TEST:3. Marking -- " + key + ":" + i);
+        values.mark();
+      }
+      
+      if (count >= 8) {
+        expectedValues.add(i);
+      }
+      count ++;
+    }
+
+    values.reset();
+    LOG.info("TEST:3. After reset");
+
+    if (!values.hasNext()) {
+      errors ++;
+      LOG.info("TEST:3, Check:1. HasNext returned false");
+      return errors;
+    }
+
+    count = 0;
+    
+    while (values.hasNext()) {
+      i = values.next();
+      LOG.info(key + ":" + i);
+      
+      if (count < expectedValues.size()) {
+        if (i != expectedValues.get(count)) {
+          errors ++;
+          LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+              + ", Got: " + i);
+          return errors;
+        }
+      }
+
+      if (count == 10) {
+        values.clearMark();
+        LOG.info("TEST:3. After clear mark");
+      }
+      count ++;
+    }
+
+    boolean successfulClearMark = false;
+    try {
+      LOG.info("TEST:3. Before Reset");
+      values.reset();
+    } catch (IOException e) {
+      successfulClearMark = true;
+    }
+    
+    if (!successfulClearMark) {
+      LOG.info("TEST:3 Check:4 reset was successfule even after clearMark");
+      errors ++;
+      return errors;
+    }
+    
+    LOG.info("TEST:3 Done.");
+    return errors;
+  }
+
+
+  public void createInput() throws Exception {
+    // Just create one line files. We use this only to
+    // control the number of map tasks
+    for (int i = 0; i < NUM_MAPS; i++) {
+      Path file = new Path(TEST_ROOT_DIR+"/in", "test" + i + ".txt");
+      localFs.delete(file, false);
+      OutputStream os = localFs.create(file);
+      Writer wr = new OutputStreamWriter(os);
+      wr.write("dummy");
+      wr.close();
+    }
+  }
+
+  public void testValueIterReset() {
+    try {
+      Configuration conf = new Configuration();
+      Job job = new Job(conf, "TestValueIterReset") ;
+      job.setJarByClass(TestValueIterReset.class);
+      job.setMapperClass(TestMapper.class);
+      job.setReducerClass(TestReducer.class);
+      job.setNumReduceTasks(NUM_TESTS);
+      job.setMapOutputKeyClass(IntWritable.class);
+      job.setMapOutputValueClass(IntWritable.class);
+      job.setOutputKeyClass(IntWritable.class);
+      job.setOutputValueClass(IntWritable.class);
+      job.getConfiguration().setInt("mapred.job.reduce.markreset.buffer.size",
+                                    128);  
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+      FileInputFormat.addInputPath(job,
+          new Path(TEST_ROOT_DIR + "/in"));
+      Path output = new Path(TEST_ROOT_DIR + "/out");
+      localFs.delete(output, true);
+      FileOutputFormat.setOutputPath(job, output);
+      createInput();
+      assertTrue(job.waitForCompletion(true));
+      validateOutput();
+    } catch (Exception e) {
+      e.printStackTrace();
+      assertTrue(false);
+    }
+  }
+
+  private void validateOutput() throws IOException {
+    Path[] outputFiles = FileUtil.stat2Paths(
+        localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
+            new OutputLogFilter()));
+    if (outputFiles.length > 0) {
+      InputStream is = localFs.open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      while (line != null) {
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String key = tokeniz.nextToken();
+        String value = tokeniz.nextToken();
+        LOG.info("Output: key: "+ key + " value: "+ value);
+        int errors = Integer.parseInt(value);
+        assertTrue(errors == 0);
+        line = reader.readLine();
+      }   
+      reader.close();
+    }
+  }
+}